DjangoBrothers BLOG ✍️

2019/09/07

このエントリーをはてなブックマークに追加
Python Celery 非同期処理

【Python】ローカル環境でCeleryを動かしてみる

ジョブキューフレームワークのCeleryの使い方です。 この記事ではとりあえず、ローカル環境で動かしてみるところまでやってみます。

基本的に公式ドキュメントに沿って進めていきます。

ブローカーを選ぶ

Celeryはメッセージの送受信のために、メッセージブローカーという別のサービスが必要になります。

ブローカーにはRabbitMQやRedisがあります。今回使うのはRedis。

CeleryとRedisはpipでインストールできます。

CeleryとRedisをインストール

$ pip install celery redis

今回、Redisはdockerを使って動かします。

ターミナル

$ docker run -d -p 6379:6379 redis

Celeryアプリを作る

Celeryインスタンス(アプリ)を作ります。

その上で、@app.taskのように、関数にデコレータ をつけると、その関数は非同期タスクとして処理できるようになります。

tasks.py

from celery import Celery

# 第一引数→カレントモジュールの名前
# broker引数→ブローカーのURL
app = Celery('tasks', broker='redis://localhost')

@app.task
def add(x, y):
    return x + y

Celery Worker Serverを起動する

タスクを実行するワーカーを起動します。

ワーカーを起動する

$ celery -A tasks worker --loglevel=info

本番環境で、デーモンシステムとしてワーカーを使うためには、各プラットフォームが提供しているツールを使うことになります。 別記事で、HerokuでCeleryを使う方法を書きたいと思います。

Taskを呼び出す

delay()メソッドで、タスクを呼び出す(ジョブキューに追加する)ことができます。

ターミナル

>>> from tasks import add
>>> add.delay(2, 3)

これにより、add関数の処理が非同期処理のジョブとして登録され、ワーカーによって処理されます。 処理結果は、ワーカーを起動したターミナル画面で確認することができます。

当然ですが、add(2, 3)のように普通にadd関数を呼び出すこともできますが、その際はジョブとしては登録されません。

TaskのResultにアクセスする

タスクを呼び出すと、返り値としてAsyncResultのインスタンスが返ってきます。

AsyncResultは、実行されたタスクについて様々なステータスを保持しています。 AsyncResultを変数に代入しておくことで、実行結果のステータスにアクセスすることができます。

ただし、デフォルトでは実行結果は保存されないので注意が必要です。 Redisを使って実行結果を保存する場合は、以下の1行を追加してあげることで実現できます。

tasks.py

from celery import Celery

app = Celery('tasks', broker='redis://localhost')
app.conf.result_backend = 'redis://localhost:6379/0'  # 実行結果を保存するために追加

@app.task
def add(x, y):
    return x + y

以下の例では、変数resultに、add関数の実行結果を代入して、タスクのステータスを取得しています。

ターミナル

>>> result = add.delay(2, 3)

# タスクのidを取得
>>> result.id
'4daae568-25b7-4f0f-9198-c1096a9b1f13'

# タスクが実行完了しているか
>>> result.ready()
True

sleep関数を使った遅い処理を非同期タスクとして登録する

上の例で作ったadd関数だとすぐに処理が終わってしまい、いまいちイメージがつかめないので、sleep関数を使ってみます。

task.py

from time import sleep  # 追加
from celery import Celery

app = Celery('tasks', broker='redis://localhost')

@app.task
def f():
    print('f関数開始')
    sleep(5)
    print('f関数終了')


def test():
    print('テスト開始')
    f()  # f関数をdelayを使わず普通に呼び出す
    print('テスト終了')


if __name__ == '__main__':
    test()

ターミナル

$ python3 tasks.py
テスト開始
f関数開始
f関数終了
テスト終了

f関数でsleep(5)としているので、f関数を普通に呼び出した場合は、「テスト開始」から「テスト終了」が出力されるまでは5秒掛かってしまいます。

次に、f.delay()として、f関数を非同期タスクとして登録するように変更します。

tasks.py

def test():
    print('テスト開始')
    f.delay()  # f関数を非同期タスクとして登録する
    print('テスト終了')

ターミナル

テスト開始
テスト終了

すると、「テスト開始」出力後即座に(5秒間待たずに)「テスト終了」が出力されるようになります。

5秒掛かるf関数の処理は、ワーカー上で行われるので、ワーカーの方のログ画面で実行内容が確認できます。