小村のポートフォリオサイト開発(14) DjangoRestFramework ここまでのリファクタリング

  • こんばんは、小村だよ!

  • [DRF レスポンスデータ]みたいな感じで検索したらこのブログがヒットしてびっくりしたよ!

  • じゃー今日も下記のポートフォリオサイトを構築していくよ

  • 前回はAPIのレスポンスのフォーマットを決めたね

  • 今日はここまで書いてきたAPIリファクタリングしていこうと思います!



目次

  1. 現状整理
  2. utils.hatena.py作成
  3. views.entry.py修正



記録

現状整理

  • まずは現状のviews.entry.pyの中身が下記

  • fat viewもいいところだね!

  • 基本的にviewはコントローラー部分だけ書くべきなのでめちゃんこよくない例です。

  • このソースをリファクタリングしていきたいと思います

# coding: utf-8
import time
import os
import requests
import xmltodict


from rest_framework import status
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response

from ..models import Entry
from ..serializer import EntryAllSerializer, EntryCreateAndUpdateSerializer


class EntryViewSet(viewsets.ModelViewSet):
    queryset = Entry.objects.all()
    serializer_class = EntryAllSerializer

    @action(detail=False, methods=['post'])
    def capture(self, request):
        url = self.getHatenaApiUrl('entry')
        auth = self.getHatenaApiAuth()
        hatena_entry_ids = []
        response = {
            'created_count': 0,
            'updated_count': 0,
            'deleted_count': 0,
            'failed_count': 0,
            'failed_hatena_entry_ids': [],
        }

        while url != '':
            # 連続で呼び出すの怖いので0.5秒間間を空ける
            time.sleep(0.5)

            hatena_list = requests.get(url, auth=auth)
            dict_data = xmltodict.parse(hatena_list.text, encoding='utf-8')
            entries = []
            if 'feed' in dict_data:
                if 'entry' in dict_data['feed']:
                    entries = dict_data['feed']['entry']
            for entry in entries:
                if not isinstance(entry, dict):
                    continue

                # hatena_entry_id取得
                hatena_entry_id = entry['id'][
                    entry['id'].rfind('-') + 1:] if 'id' in entry else ''

                hatena_entry_ids.append(hatena_entry_id)

                # category取得
                if 'category' in entry:
                    if isinstance(entry['category'], list):
                        category = entry['category'][0]['@term']
                    else:
                        category = entry['category']['@term']
                else:
                    category = ''

                # title取得
                title = entry['title'] if 'title' in entry else ''

                # summary取得
                summary = entry['summary']['#text'] if 'summary' in entry else ''

                # content_md取得
                content_md = entry['content']['#text'] if 'content' in entry else ''

                # content_html取得
                content_html = entry['hatena:formatted-content']['#text'] if 'hatena:formatted-content' in entry else ''

                # draft取得
                draft = entry['app:control']['app:draft'] if 'app:control' in entry else ''

                # updated_at取得
                updated_at = entry['updated'] if 'updated' in entry else None

                # edited_at取得
                edited_at = entry['app:edited'] if 'app:edited' in entry else None

                # 更新用パラメータ
                param = {
                    'hatena_entry_id': hatena_entry_id,
                    'category': category,
                    'title': title,
                    'summary': summary,
                    'content_md': content_md,
                    'content_html': content_html,
                    'draft': draft,
                    'updated_at': updated_at,
                    'edited_at': edited_at,
                }

                entry = Entry.objects.filter(
                    hatena_entry_id=hatena_entry_id).first()

                if not entry:
                    # 新規作成
                    mode = 'create'
                    serializer = EntryCreateAndUpdateSerializer(data=param)

                else:
                    # 更新
                    mode = 'update'
                    serializer = EntryCreateAndUpdateSerializer(
                        entry, data=param)

                if serializer.is_valid():
                    serializer.save()
                    if mode == 'create':
                        response['created_count'] = response['created_count'] + 1
                    else:
                        response['updated_count'] = response['updated_count'] + 1
                else:
                    response['failed_count'] = response['failed_count'] + 1
                    response['failed_hatena_entry_ids'].append(hatena_entry_id)

            # 次のURLを取得
            url = ''
            if 'link' in dict_data['feed']:
                if isinstance(dict_data['feed']['link'], list):
                    for link in dict_data['feed']['link']:
                        if isinstance(link, dict):
                            if '@rel' in link:
                                if link['@rel'] == 'next':
                                    url = link['@href']

        # はてなIDが存在しなければ論理削除
        response['deleted_count'] = Entry.objects.exclude(
            hatena_entry_id__in=hatena_entry_ids).delete()

        return Response(response, status.HTTP_200_OK)

    def getHatenaApiUrl(self, action):
        HATENA_API_URL_HEADER = 'https://blog.hatena.ne.jp'
        HATENA_API_USER = os.environ.get('HATENA_API_USER')
        HATENA_API_BLOG = os.environ.get('HATENA_API_BLOG')
        HATENA_API_URL_FUTTER = 'atom'

        url = [
            HATENA_API_URL_HEADER,
            HATENA_API_USER,
            HATENA_API_BLOG,
            HATENA_API_URL_FUTTER,
            action
        ]
        return os.path.join(*url)

    def getHatenaApiAuth(self):
        HATENA_API_USER = os.environ.get('HATENA_API_USER')
        HATENA_API_KEY = os.environ.get('HATENA_API_KEY')

        return (HATENA_API_USER, HATENA_API_KEY)



utils.hatena.py作成

  • そもそもはてなブログAPIに関する処理はEntryと直接関係ないですね。

  • ということでutilsフォルダ内にhatena.pyを作成して、はてなブログAPI関係の処理は全てここにまとめようと思います

  • これによりHatenaApiをインスタンス化してgetAllEntriesを実行するといい感じに加工された記事データが取得できるようになりました

f:id:kom314_prog:20210830234554p:plain
import os
import requests
import xmltodict


class HatenaApi():
    HATENA_API_USER = os.environ.get('HATENA_API_USER')
    HATENA_API_BLOG = os.environ.get('HATENA_API_BLOG')
    HATENA_API_KEY = os.environ.get('HATENA_API_KEY')

    HATENA_API_URL_HEADER = 'https://blog.hatena.ne.jp'
    HATENA_API_URL_FUTTER = 'atom'

    def getAllEntries(self):

        all_entries = []
        url = self.getHatenaApiFirstUrl('entry')

        while url != '':
            hatenaApiData = self.getHatenaApi(url)

            for entry in self.getApiEntries(hatenaApiData):
                all_entries.append(self.formatEntry(entry))
            url = self.getHatenaApiNextUrl(hatenaApiData)

        return all_entries

    def getHatenaApi(self, url):
        auth = self.getHatenaApiAuth()
        hatena_list = requests.get(url, auth=auth)
        dict_data = xmltodict.parse(hatena_list.text, encoding='utf-8')
        return dict_data

    def getApiEntries(self, hatenaApiData):
        return self.getDictValue(hatenaApiData, ['feed', 'entry'])

    def formatEntry(self, entry):
        format_entry = {}
        format_entry['hatena_entry_id'] = self.getHatenaEntryId(entry)
        format_entry['category'] = self.getCategory(entry)
        format_entry['title'] = self.getDictValue(entry, ['title'])
        format_entry['summary'] = self.getDictValue(
            entry, ['summary', '#text'])
        format_entry['content_md'] = self.getDictValue(
            entry, ['content', '#text'])
        format_entry['content_html'] = self.getDictValue(
            entry, ['hatena:formatted-content', '#text'])
        format_entry['draft'] = self.getDictValue(
            entry, ['app:control', 'app:draft'])
        format_entry['updated_at'] = self.getDictValue(
            entry, ['updated'], None)
        format_entry['edited_at'] = self.getDictValue(
            entry, ['app:edited'], None)

        return format_entry

    def getHatenaEntryId(self, entry):
        id_all = self.getDictValue(entry, ['id'])
        id = id_all[id_all.rfind('-') + 1:] if id_all else ''
        return id

    def getCategory(self, entry):
        category_all = self.getDictValue(entry, ['category'])
        if isinstance(category_all, list):
            category = category_all[0]
        else:
            category = category_all
        return self.getDictValue(category, ['@term'])

    def getHatenaApiFirstUrl(self, action):

        url = [
            self.HATENA_API_URL_HEADER,
            self.HATENA_API_USER,
            self.HATENA_API_BLOG,
            self.HATENA_API_URL_FUTTER,
            action
        ]
        return os.path.join(*url)

    def getHatenaApiNextUrl(self, hatenaApiEntries):
        url = ''
        links = self.getDictValue(hatenaApiEntries, ['feed', 'link'])
        if isinstance(links, list):
            for link in links:
                if self.getDictValue(link, ['@rel']) == 'next':
                    url = self.getDictValue(link, ['@href'])
        return url

    def getHatenaApiAuth(self):

        return (self.HATENA_API_USER, self.HATENA_API_KEY)

    def getDictValue(self, dictData, keys, emptyValue=''):
        for key in keys:
            if isinstance(dictData, dict):
                if key in dictData:
                    dictData = dictData[key]
            else:
                return emptyValue
        return dictData



views.entry.py修正

  • あとは今までのentry.pyから上記処理を呼び出して、登録・更新・削除を行うようにします

  • もうちょっと短くなると思ってたのですが、思いのほか処理減らなかったな……

  • でもまぁかなり読みやすくなったのでよし!!!

f:id:kom314_prog:20210830235143p:plain
# coding: utf-8
from rest_framework import status
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response

from ..models import Entry
from ..utils import HatenaApi
from ..serializer import EntryAllSerializer, EntryCreateAndUpdateSerializer


class EntryViewSet(viewsets.ModelViewSet):
    queryset = Entry.objects.all()
    serializer_class = EntryAllSerializer

    @action(detail=False, methods=['post'])
    def capture(self, request):
        hatena_api = HatenaApi()
        all_entries = hatena_api.getAllEntries()
        all_entries_sort = sorted(all_entries, key=lambda x: x['edited_at'])
        response = {
            'created_count': 0,
            'updated_count': 0,
            'deleted_count': 0,
            'failed_count': 0,
            'failed_hatena_entry_ids': [],
        }
        hatena_entry_ids = []
        for param in all_entries_sort:
            hatena_entry_id = param['hatena_entry_id']
            hatena_entry_ids.append(hatena_entry_id)
            # 更新用パラメータ
            entry = Entry.objects.filter(
                hatena_entry_id=hatena_entry_id).first()

            if not entry:
                # 新規作成
                mode = 'create'
                serializer = EntryCreateAndUpdateSerializer(data=param)

            else:
                # 更新
                mode = 'update'
                serializer = EntryCreateAndUpdateSerializer(
                    entry, data=param)

            if serializer.is_valid():
                serializer.save()
                if mode == 'create':
                    response['created_count'] = response['created_count'] + 1
                else:
                    response['updated_count'] = response['updated_count'] + 1
            else:
                response['failed_count'] = response['failed_count'] + 1
                response['failed_hatena_entry_ids'].append(
                    hatena_entry_id)

        # はてなIDが存在しなければ論理削除
        response['deleted_count'] = Entry.objects.exclude(
            hatena_entry_id__in=hatena_entry_ids).delete()

        return Response(response, status.HTTP_200_OK)



おわりに

  • リファクタリングが完了しました!

  • 取込系の処理に関してはひとまずこれでよさそうかなーと思います

  • あ、でもテスト作ってないや。ユニットテスト作らなきゃだね

  • 次回当たりユニットテスト作っていきたいと思います!

  • ではでは、ちゃお~~~!