본문 바로가기
Back-end/Python

[Celery] 무작정 시작하기 (3) - Chain

by 허도치 2020. 1. 20.

2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행

2020/01/17 - [Back-end/Python] - [Celery] 무작정 시작하기 (2) - Task

 

 

  지난 포스트에서 apply_async(delay)로 Task를 단일 실행시키는 방법에 대해서 간단하게 알아보았다. 단일로 실행시켰을 경우 각 Task는 독립적이기 때문에 서로의 결과값을 참조할 수 없다. 하지만, 처음에 실행된 Task의 결과값을 다음 Task에서 사용해야하는 경우도 있을 것이다. 그래서, 이번 포스트에서는 이러한 문제를 해결할 수 있는 Chain에 대해서 알아보도록 하겠다.

 

 

지난 포스트에서 셋팅한 Celery App을 사용하므로 반드시 선행하길 바란다.

 

 

1. Chain이란?

   1-1. 단어의 뜻 그대로, 연속, 연쇄되는 것을 의미하며, Celery에서는 Task를 순차적으로 실행하는 것을 의미함.

 

   1-2. 앞에 실행된 결과값을 다음 실행되는 Task의 첫번째 인자값으로 전달함.

         - 처음 실행되는 Task에만 모든 인자값을 전달함

         - 이후 실행되는 Task에는 첫번째 인자값을 제외한 값을 전달함.

         예) add( num1, num2 )

Task / Arguments num1=1, num2=2 num2=3 num2=4 num2=5
Task_1 1 + 2 = 3      
Task_2   3 + 3 = 6    
Task_3     6 + 4 = 10  
Task_4       10 + 5 = 15

 

   1-3. delay()와 get()을 이용하여 구현할 수도 있지만, 교착상태에 빠질 수 있어 권장하지 않는 방법.

         예1) sum = add.delay( 1, 2 ).get()

         예2) sum = add.delay( sum, 3 ).get()

         예3) sum = add.delay( sum, 4 ).get()

 

   1-4. Subtask(s)를 사용하며, Task를 리스트나 튜플에 담거나 비트연산자로 연결하여 사용할 수 있음.

 

   1-5. 결과값을 반환받을 경우 동기적으로 실행되는 것과 똑같기 때문에 결과값을 반환받지 않는 것을 권장.

         - 예제에서는 Chain의 과정을 확인을 위해 결과값을 반환받을 것임.

 

   1-6. 더 자세한 내용은 공식 문서를 참고.

 

 

2. Chain 예제 소스.

   2-1. 1~5까지 모두 더하시오.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# run_chain.py
from tasks import calc
from celery import chain
 
subtask_1 = calc.add.s( 12 )
subtask_2 = calc.add.s( 3 )
subtask_3 = calc.add.s( 4 )
subtask_4 = calc.add.s( 5 )
 
# 튜플 또는 리스트를 이용한 Chain
tasks = ( subtask_1, subtask_2, subtask_3, subtask_4 )
 
chaining = chain( tasks )   # Chain Task 생성
chain_task = chaining( )    # Chain 실행
 
# 비트연산자( | )를 이용한 Chain
# chain_task = ( subtask_1 | subtask_2 | subtask_3 | subtask_4 ).apply_async()
 
print"\n# 1. Task ID 확인" )
print"chain_task is {}".format( chain_task.id ) )
 
print"\n# 2. Task 상태" )
print"chain_task is {}".format( chain_task.ready() ) )
 
print"\n# 3. 실행결과 확인" )
print"chain_task is {}".format( chain_task.get() ) )
 
print"\n# 4. Task 상태" )
print"chain_task is {}".format( chain_task.ready() ) )
cs

         - 3 ln: Task Chianing을 위한 chain을 Import.

         - 5~8 ln: 1~5까지 더하는 subtask를 생성.

           * subtask_1은 맨 처음 실행되는 Task로 모든 인자값을 전달해야함.

         - 11 ln: subtask를 Iterable하게 만듦, 여기서 Set은 사용할 수 없음.

         - 13~14 ln: Chain Task 생성 및 실행.

         - 17 ln: 비트연산자를 이용한 Chaining.

 

   2-2. 실행 결과.

         - subtask를 chaning한 Task도 uuid를 부여받음.

         - 1~5까지 모두 더한 15가 출력되는 것을 확인할 수 있음.

         - 마지막 Task가 종료된 후 chain_task의 ready 상태가 True로 바뀜.

 

   2-3. Celery 로그 확인.

         - 두번째 Task부터 인자값을 한개만 넘겼는데도 2개의 인자값을 받아서 처리하는 것으로 확인됨.

 

 

3. link 옵션을 이용한 Chaining 예제 소스.

   3-1. 1~5를 더하시오.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# run_link.py
from tasks import calc
from celery import chain
 
subtask_1 = calc.add.s( 3 )
subtask_2 = calc.add.s( 4 )
subtask_3 = calc.add.s( 5 )
 
links = ( subtask_1 | subtask_2 | subtask_3 )
 
link_task = calc.add.apply_async( args=12 ], link=links )
 
print"\n# 1. Task ID 확인" )
print"link_task is {}".format( link_task.id ) )
 
print"\n# 2. Task 상태" )
print"link_task is {}".format( link_task.ready() ) )
 
print"\n# 3. 실행결과 확인" )
print"link_task is {}".format( link_task.get() ) )
 
print"\n# 4. Task 상태" )
print"link_task is {}".format( link_task.ready() ) )
cs

         - 9 ln: 비트연산자( | )로 link를 연결.

           * 리스트 또는 튜플로 link를 연결하는 경우에는 다르게 동작함.

         - 11 ln: args로 초기값 1, 2를 전달하고, link 옵션으로 subtask를 전달.

 

   3-2. 실행 결과.

         - 결과값은 처음 실행된 Task의 결과값을 반환함.

 

   3-3. Celery 로그 확인.

         - 결과값은 3을 반환했지만, 실제 Task는 chain과 동일하게 수행된 것을 확인할 수 있음.

 

   3-4. *리스트 또는 튜플로 연결하는 경우.

1
links = ( subtask_1 , subtask_2 , subtask_3 )
cs

         - [ 3-1 ]예제의 9 ln을 위와 같이 변경하고 실행.

 

   3-5. 실행 결과.

         - 실행 결과는 [ 3-2 ]와 동일함.

 

   3-6. Celery 로그 확인.

         - 다른 Chain 예제들은 순차적으로 Task 받고 수행되는데, 이 경우에는 

         - 처음 실행된 결과값을 모든 subtask의 첫번째 인자로 사용함.

Task / Arguments num1=1, num2=2 num=3 num=4 num=5
Task_1 1 + 2 = 3      
Task_2   3 + 3 = 6    
Task_3     3 + 4 = 7  
Task_4       3+ 5 = 8

 

 

4. 마치며.

   - 이번 포스트에서는 Task를 좀 더 유용하게 쓰기위한 chain에 대해서 알아보았다. 간단하게 덧셈을 이용하여 예제를 다뤘지만, 순차적으로 크롤링해야 하는 경우에 유용하게 쓸 수 있을거라고 생각된다.

   - 다음 포스트에서는 Task를 병렬처리할 수 있는 groupchord에 대해서 알아보도록 하겠다.

댓글