본문 바로가기
Back-end/Python

[Celery] 무작정 시작하기 (2) - Task

by 허도치 2020. 1. 17.

2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행

 

 

  지난 포스트에서 Celery를 설치하고 간단하게 Task를 비동기로 실행해보았다. Celery가 비동기 태스크 큐이지만 동기적으로 Task를 실행할 수도 있고, 이전 Task의 결과를 다음 Task의 인자로 전달하여 실행하는 등 다양한 처리 방법들을 제공한다. 그래서 이번 포스트에서는 지금까지 찾아본 Task를 실행하는 방법들에 대해서 알아보도록 하겠다.

 

 

1. Task를 실행하는 방법.

   1-1. apply_async 또는 delay를 이용하는 방법.

         - 호출과 동시에 Task가 실행됨.

         - delayapply_async를 사용하여 만든 shortcut이며, Arguments만 전달할 수 있고 옵션은 설정할 수 없음.

         - apply_asyncArguments와 countdown, quque, ingnore_result 등 다양한 옵션을 설정할 수 있음.

 

   1-2. subtask 또는 s 를 이용하는 방법.

         - 대기 상태의 Task를 생성하고, appy_async 또는 delay로 호출하여 실행할 수 있음.

         - 또한, Task를 생성하거나 실행할 때, 선택적으로 파라미터를 줄 수 있으며 재사용이 가능함.

         - ssubtask의 shortcut으로 delay와 비슷함.

         - subtaskapply_async와 비슷함.

 

   1-3. 더 자세한 내용은 공식 문서를 참고.

 

 

2. 패키지 구성.

         - [ dochi_app.py ]: Celery App을 생성하는 파일.

         - [ run.py ]: Task를 실행시킬 파일.

         - [ tasks ]: Worker의 Task들을 모아 놓은 폴더.

         - [ tasks/calc.py ]: Worker의 Task 파일, 간단한 더하기 예제.

         - [ tasks/example.py ]: Worker의 Task파일, 이전 포스트에서 다루었던 예제.

 

 

3. 예제 소스 작성.

   3-1. Celery App 생성.

         - dochi_app.py

1
2
3
4
5
6
7
8
9
10
11
from celery import Celery
 
app = Celery(
    'my_tasks'
    , broker='amqp://heo:heo@localhost:5672//'
    , backend='rpc://'
    , include=[
        'tasks.example'
        , 'tasks.calc'
    ]
)
cs

         - 4 ln: Celery App의 이름.

         - 5 ln: 처리할 Task를 보관하는 Broker(중계자)를 설정.

         - 6 ln: 처리된 결과를 보관하는 Broker를 설정, 이를 생략하면 Task의 실행 결과를 받을 수 없음.

         - 7~9 ln: Worker가 처리할 Task를 지정.

 

   3-2. Task 생성.

         - tasks/example.py

         - 랜덤한 시간 이후에 Task를 종료하는 예제.

1
2
3
4
5
6
7
8
9
10
11
12
import time
import random
 
from dochi_app import app
 
@app.task
def working( id=1 ):
 
    # 1~5초 사이의 랜덤한 Delay를 발생.
    time.sleep( random.randint(1,5) )
    
    return '{}번째, 일을 끝냈다.'.format( id )
cs

         - 4 ln: 앞서 생성한 Celery App을 import하여 재사용.

   

   3-3. Task 생성.

         - tasks/calc.py

         - 간단한 더하기 예제.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
 
from dochi_app import app
 
@app.task
def add( num1, num2 ):
    time.sleep( 1 )
    
    print"{} + {} = {}".format( num1, num2, num1+num2 ) )
    
    return num1 + num2
 
@app.task
def callback( results ):
    return results
cs

         - 3 ln: 앞서 생성한 Celery App을 import하여 재사용.

         - 14 ln: 실행 결과를 반환받을 함수.

 

4. Celery 실행.

   4-1. 실행 명령어.

1
2
3
celery -A dochi_app worker -l INFO -P solo -10
#또는
celery worker --app=dochi_app --loglevel=INFO --pool=solo --concurrency=10
cs

         - [ -A | --app ]: Celery App의 파일명.

         - [ worker ]: Worker 실행 명령어.

         - [ -l | --loglevel ]: Logging 레벨 설정, DEBUG, INFO, WARNING, ERROR, CRITICAL, or FATAL.

         - [ -P | --pool ]: 프로세스 처리 방식.

         - [ -c | --concurrency ]: 동시에 병행처리 할 수 있는 Task의 수.

 

   4-2. 실행 결과.

 

 

5. Task 실행 소스 예제.

   5-1. apply_async 와 delay를 사용하여 진행 상황 살펴보기 확인해보기.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# run.py
from tasks import calc
 
# delay와 apply_async
task_1 = calc.add.delay( 12 )
task_2 = calc.add.apply_async( args=[34], ignore_result=True )
task_3 = calc.add.apply_async( args=[56], kwargs={} )
 
print"# 1. Task ID 확인" )
print"task_1 is {}".format( task_1.id ) )
print"task_2 is {}".format( task_2.id ) )
print"task_3 is {}".format( task_3.id ) )
 
print"\n# 2. Task 상태" )
print"task_1 is ready? {}".format( task_1.ready() ) )
print"task_2 is ready? {}".format( task_2.ready() ) )
print"task_3 is ready? {}".format( task_3.ready() ) )
 
print"\n# 3. 실행결과 확인" )
print"task_1 is {}".format( task_1.get() ) )
print"task_2 is {}".format( task_2.get() ) )
print"task_3 is {}".format( task_3.get() ) )
 
print"\n# 4. Task 상태" )
print"task_1 is ready? {}".format( task_1.ready() ) )
print"task_2 is ready? {}".format( task_2.ready() ) )
print"task_3 is ready? {}".format( task_3.ready() ) )
cs

         - 5 ln: shortcut으로 Arguments만 전달하여 Task를 실행.

         - 6 ln: Arguments와 옵션을 전달하여 Task를 실행.

           * ignore_result는 실행결과의 반환여부, False는 실행결과를 알 수 없음.

         - 7 ln: Arguments만 전달하여 Task를 실행.

         - 10~12 ln: 각 Task의 ID를 확인할 수 있음.

         - 15~17, 24~27 ln: ready()를 통해 현재 Task의 작업 상태를 확인.

           * True이면 완료, False이면 진행중이거나 알 수 없음.

         - 20~22 ln: get()를 통해 현재 Task의 실행 결과를 확인.

           * get을 사용할 경우 Task가 동기적으로 실행됨.

 

   5-2. 실행 결과.

         - Task의 ID는 모두 유니크한 값을 가짐.

         - Task 상태가 모두 False인 것은 아직 실행중인 상태를 나타냄.

         - task_2ignore_result옵션을 적용했기 때문에 실행 결과를 알 수 없음.

         - task_2는 결과를 반환하지 않기 때문에 기다리지 않고 바로 넘어감.

 

 

6. SubTasks 실행 소스 예제.

   6-1. subtask와 s를 사용하여 진행 상황 살펴보기.

         - 내용이 많아보이지만 반복되는 내용이니까 겁먹지말자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# run.py
from tasks import calc
 
# s와 subtask
subtask_1 = calc.add.s( 12 )
subtask_2 = calc.add.subtask( args=[34], ignore_result=True )
subtask_3 = calc.add.subtask( args=[56], kwargs={} )
subtask_4 = calc.add.subtask()
 
print"# 1. SubTask 확인" )
print"subtask_1 is {}".format( subtask_1.id ) )
print"subtask_2 is {}".format( subtask_2.id ) )
print"subtask_3 is {}".format( subtask_3.id ) )
print"subtask_4 is {}".format( subtask_4.id ) )
 
task_1 = subtask_1.delay()
task_2 = subtask_2.apply_async()
task_3 = subtask_3.delay()
task_4 = subtask_4.apply_async( args=[78], ignore_result=True )
task_5 = subtask_4.delay( 910 )
 
print"\n# 2. Task ID 확인" )
print"task_1 is {}".format( task_1 ) )
print"task_2 is {}".format( task_2 ) )
print"task_3 is {}".format( task_3 ) )
print"task_4 is {}".format( task_4 ) )
print"task_5 is {}".format( task_4 ) )
 
print"\n# 3. Task 상태" )
print"task_1 is ready? {}".format( task_1.ready() ) )
print"task_2 is ready? {}".format( task_2.ready() ) )
print"task_3 is ready? {}".format( task_3.ready() ) )
print"task_4 is ready? {}".format( task_4.ready() ) )
print"task_5 is ready? {}".format( task_5.ready() ) )
 
print"\n# 4. 실행결과 확인" )
print"task_1 is {}".format( task_1.get() ) )
print"task_2 is {}".format( task_2.get() ) )
print"task_3 is {}".format( task_3.get() ) )
print"task_4 is {}".format( task_4.get() ) )
print"task_5 is {}".format( task_5.get() ) )
 
print"\n# 5. Task 상태" )
print"task_1 is ready? {}".format( task_1.ready() ) )
print"task_2 is ready? {}".format( task_2.ready() ) )
print"task_3 is ready? {}".format( task_3.ready() ) )
print"task_4 is ready? {}".format( task_4.ready() ) )
print"task_5 is ready? {}".format( task_5.ready() ) )
cs

         - 5 ln: shortcut으로 Arguments만 정의 subtask를 생성.

         - 6 ln: Arguments와 옵션이 정의된 subtask를 생성.

         - 7 ln: Arguments만 정의subtask를 생성.

         - 8 ln: 빈 subtask를 생성.

         - 11~14 ln: subtask의 ID를 출력.

         - 16~18 ln: shortcut으로 파라미터 없이 subtask를 실행

           * 이미 Arguments를 정의했기 때문에 여기서 추가로 전달할 경우 Append됨.

         - 19 ln: 빈 subtask에 Arguments를 전달하여 subtask를 실행.

         - 20 ln: 19 ln에서 실행했던 subtask를 재사용하여 실행.

         - 나머지는 [ 5-1 예제 ] 와 같음.

 

   6-2. 실행 결과 화면.

         - Subtask는 실행되기 전 상태이므로 ID를 가지지않음.

         - task_4와 task_5는 같은 subtask를 사용하기 때문에 ID가 같음.

         - task_2와 task_4는 ignore_result가 True이기 때문에 실행 결과를 알 수 없음.

 

 

7. Subtask를 사용하는 이유.

   - 위 예제만 보면 큰 차이점을 못느낄 수도 있는데, 자세히 살펴보면 아주 큰 차이점이 있다.

   - apply_async는 호출하는 즉시 실행되고, subtask는 대기 상태였다가 호출이 되면 실행한다.

   - 즉, apply_async는 Task가 독립적으로 각각 실행되는 경우에 적합하며, subtask는 연속된 Task나 Task를 일괄처리하는데 적합하다.

   - subtask를 처리할 때 chain을 사용하여 연속된 Task를 처리하고, chord를 사용하여 일괄처리 할 수 있다.

 

 

8. 마치며

   - 이번 포스트에서는 Task를 실행하는 방법에 대해서 알아보았다. 공부하면서 예제를 만들고 부연설명들을 붙이다보니 내용이 좀 길어진 감이 있다. 그래도 Celery에 대해서 많이 알게되었다.

   - 다음 포스트에서는 chainchord에 대해서 알아보도록 하겠다.

댓글