2019/09/07
Python Celery 非同期処理【Python】ローカル環境でCeleryを動かしてみる
ジョブキューフレームワークのCeleryの使い方です。 この記事ではとりあえず、ローカル環境で動かしてみるところまでやってみます。
基本的に公式ドキュメントに沿って進めていきます。
ブローカーを選ぶ
Celeryはメッセージの送受信のために、メッセージブローカーという別のサービスが必要になります。
ブローカーにはRabbitMQやRedisがあります。今回使うのはRedis。
CeleryとRedisはpipでインストールできます。
CeleryとRedisをインストール
$ pip install celery redis
今回、Redisはdockerを使って動かします。
ターミナル
$ docker run -d -p 6379:6379 redis
Celeryアプリを作る
Celeryインスタンス(アプリ)を作ります。
その上で、@app.task
のように、関数にデコレータ をつけると、その関数は非同期タスクとして処理できるようになります。
tasks.py
from celery import Celery
# 第一引数→カレントモジュールの名前
# broker引数→ブローカーのURL
app = Celery('tasks', broker='redis://localhost')
@app.task
def add(x, y):
return x + y
Celery Worker Serverを起動する
タスクを実行するワーカーを起動します。
ワーカーを起動する
$ celery -A tasks worker --loglevel=info
本番環境で、デーモンシステムとしてワーカーを使うためには、各プラットフォームが提供しているツールを使うことになります。 別記事で、HerokuでCeleryを使う方法を書きたいと思います。
Taskを呼び出す
delay()
メソッドで、タスクを呼び出す(ジョブキューに追加する)ことができます。
ターミナル
>>> from tasks import add
>>> add.delay(2, 3)
これにより、add関数の処理が非同期処理のジョブとして登録され、ワーカーによって処理されます。 処理結果は、ワーカーを起動したターミナル画面で確認することができます。
当然ですが、add(2, 3)
のように普通にadd関数を呼び出すこともできますが、その際はジョブとしては登録されません。
TaskのResultにアクセスする
タスクを呼び出すと、返り値としてAsyncResultのインスタンスが返ってきます。
AsyncResultは、実行されたタスクについて様々なステータスを保持しています。 AsyncResultを変数に代入しておくことで、実行結果のステータスにアクセスすることができます。
ただし、デフォルトでは実行結果は保存されないので注意が必要です。 Redisを使って実行結果を保存する場合は、以下の1行を追加してあげることで実現できます。
tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost')
app.conf.result_backend = 'redis://localhost:6379/0' # 実行結果を保存するために追加
@app.task
def add(x, y):
return x + y
以下の例では、変数resultに、add関数の実行結果を代入して、タスクのステータスを取得しています。
ターミナル
>>> result = add.delay(2, 3)
# タスクのidを取得
>>> result.id
'4daae568-25b7-4f0f-9198-c1096a9b1f13'
# タスクが実行完了しているか
>>> result.ready()
True
sleep関数を使った遅い処理を非同期タスクとして登録する
上の例で作ったadd関数だとすぐに処理が終わってしまい、いまいちイメージがつかめないので、sleep関数を使ってみます。
task.py
from time import sleep # 追加
from celery import Celery
app = Celery('tasks', broker='redis://localhost')
@app.task
def f():
print('f関数開始')
sleep(5)
print('f関数終了')
def test():
print('テスト開始')
f() # f関数をdelayを使わず普通に呼び出す
print('テスト終了')
if __name__ == '__main__':
test()
ターミナル
$ python3 tasks.py
テスト開始
f関数開始
f関数終了
テスト終了
f関数でsleep(5)
としているので、f関数を普通に呼び出した場合は、「テスト開始」から「テスト終了」が出力されるまでは5秒掛かってしまいます。
次に、f.delay()
として、f関数を非同期タスクとして登録するように変更します。
tasks.py
def test():
print('テスト開始')
f.delay() # f関数を非同期タスクとして登録する
print('テスト終了')
ターミナル
テスト開始
テスト終了
すると、「テスト開始」出力後即座に(5秒間待たずに)「テスト終了」が出力されるようになります。
5秒掛かるf関数の処理は、ワーカー上で行われるので、ワーカーの方のログ画面で実行内容が確認できます。