본문 바로가기
Back-end/Python

[Celery] 무장적 시장하기 (4) - Group과 Chord

by 허도치 2020. 1. 20.

2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행

2020/01/17 - [Back-end/Python] - [Celery] 무작정 시작하기 (2) - Task

2020/01/20 - [Back-end/Python] - [Celery] 무작정 시작하기 (3) - Chain

 

 

  지난 포스트에서는 Chain을 이용하여 Taks를 순차적으로 실행시키는 방법을 알아보았다. 이번 포스트에서는 Task를 일괄적으로 실핼 할 수 있는 Group과 Chord에 대해서 알아보려고 한다. 

 

 

 

1. Group이란?

   1-1. 말 그대로 하나의 집합을 만들어 실행시키는 작업으로 Task를 병렬로 실행할 수 있음.

 

   1-2. Task를 그룹화 하는 방법에는 두가지가 있음.

         - 비트연산자( | )를 이용하는 방법: Chain과 같이 순차적으로 실행됨.

         - 리스트 또는 튜플을 이용하는 방법: Task가 병렬로 수행되며 결과는 리스트로 반환됨.

 

   1-3. Group은 병렬처리를 하기 때문에 Worker의 옵션을 변경해주어야함.

         - [ -P | --pool ]:  solo가 아닌 다른 옵션으로 설정, prefork, threads, gevent, eventlet, default: profork.

         - [ -c | --concurrency ]: 병렬처리할 수 있는 수를 2이상 주어야함.

1
2
3
celery -A dochi_app worker -l INFO -P threads -10
#또는
celery worker --app=dochi_app --loglevel=INFO --pool=threads--concurrency=10
cs

 

   1-4. Group의 결과는 <class 'celery.result.GroupResult'>인데, successful(), failed와 같이 특수한 기능이 제공됨.

 

   1-5. 더 자세한 내용은 공식 문서를 참고.

 

 

2. 예제 소스.

   2-1. 1~5까지 연속하는 두 수 더하기.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# run_chain.py
from tasks import calc
from celery import group
 
tasks = [ calc.add.s( i, i+1 ) for i in range(15) ]
 
grouping = group( tasks )
group_task = grouping() # 또는 grouping.apply_async()
 
print"\n# 1. Task 확인" )
print"tasks = {}".format( tasks ) )
print"group_task.id = {}".format( group_task.id ) )
print"group_task.type = {}".format( type(group_task) ) )
 
print"\n# 2. Task 상태 확인" )
print"Subtask가 모두 준비되었는가? {}".format( group_task.ready() ) )
 
print"\n# 3. 실행결과 확인" )
print"몇건이 완료 되었는가? {}".format( group_task.completed_count() ) )
print"완료된 결과를 반환. {}".format( group_task.get() ) )
print"호출한 순서대로 반환. {}".format( group_task.join() ) )
print"몇건이 완료 되었는가? {}".format( group_task.completed_count() ) )
 
print"\n# 4. Task 상태 확인" )
print"Subtask가 모두 준비되었는가? {}".format( group_task.ready() ) )
print"모두 성공했는가? {}".format( group_task.successful() ) )
print"실패가 있는가? {}".format( group_task.failed() ) )
cs

        - 5 ln: 1~5까지 연속하는 두수 더하는 subtask 목록 생성.

        - 7 ln: group 모듈로 subtask를 그룹화.

        - 8 ln: group화된 Task를 실행.

 

   2-2. 실행결과.

         - group_task.type이 <class 'celery.result.GroupResult'>인 것으로 확인됨.

         - 결과값이 리스트로 반환됨, get()과 join() 함수로 결과를 받을 수 있음.

 

   2-3. Celery 로그 확인.

         - Received가 먼저 되는것으로 보아 Task가 병렬처리되는 것으로 확인됨.

         - Task에 Sleep을 Random하게 주면 좀 더 명확하게 확인할 수 있음.

 

 

3. Chord란?

   3-1. 사전적 의미로는 화음이라고 하며, 그룹의 모든 작업이 완료된 후에만 ​​실행되는 작업.

 

   3-2. Group에 다른 Task를 연결하면 Chord로 업그레이드됨.

         예) group( tasks ) | task

 

   3-3. Group과는 다르게 결과를 반환받을 Callback이 있어야함.

 

   3-4. 주의사항의 매우 많음.

         - 결과값을 무조건 반환받아야 함, ignore_result=False.

         - Result BackendRabbitMQ/rpc를 지원하지않음.

         - Message 처리와 동기화의 오버 헤드로 속도가 느림.

         - 동기화 단계는 비용이 많이 들기 때문에 Chord의 사용을 최대한 피하는 것을 권장.

 

   3-5. 더 자세한 내용은 공식 문서를 참고.

 

 

4. 시작하기전에...

   4-1. 위에서 정리했듯이 Chord에서는 Result Backend로 RabbitMQ/rpc를 지원하지않음.

         - 우리는 지난 포스트에서 rpc로 설정했기 때문에 변경이 필요함.

 

 

   4-2. Celery App 설정 변경.

         - dochi_app.py 수정.

1
2
3
4
5
6
7
8
9
10
11
from celery import Celery
 
app = Celery(
    'my_tasks'
    , broker='amqp://heo:heo@localhost:5672//'
    , backend='amqp://' # rpc -> amqp
    , include=[
        'tasks.example'
        , 'tasks.calc'
    ]
)
cs

         - 6 ln: rpc에서 amqp로 변경.

           * 사실 amqp도 권장하지않는 방법인데 우선 돌아가기 때문에 테스트용으로 사용할 계획임.

           * Redis나 Memcached를 사용하는 것을 권장함.

 

   4-3. 한가지 더, Chord는 결과를 반환받을 Callback 함수가 있어야함.

         - tasks/calc.py 수정.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
import random
 
from dochi_app import app
 
@app.task
def add( num1, num2 ):
    time.sleep( random.randint(110) )
    
    print"{} + {} = {}".format( num1, num2, num1+num2 ) )
    
    return num1 + num2
 
@app.task
def callback( results ):
    return sum(results)
cs

         - 15 ln: results에는 모든 subtask의 결과값이 리스트에 담겨있음.

 

   4-3. Celery 실행 확인.

         - RPC나 지속성있는 Backend를 사용해달라고 하지만, 테스트용 이기때문에 무시하고 넘어가자.

 

 

5. 예제 소스.

   5-1. 1~5까지 연속하는 두 수를 더한 총합을 구하시오.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# run_chain.py
from tasks import calc
from celery import chord
 
tasks = [ calc.add.s( i, i+1 ) for i in range(15) ]
 
chording = chord( tasks )
chord_task = chording( calc.callback.s() )
 
print"\n# 1. Task 확인" )
print"tasks = {}".format( tasks ) )
print"chord_task.id = {}".format( chord_task.id ) )
print"chord_task.type = {}".format( type(chord_task) ) )
 
print"\n# 2. Task 상태 확인" )
print"Subtask가 모두 준비되었는가? {}".format( chord_task.ready() ) )
 
print"\n# 3. 실행결과 확인" )
print"완료된 결과를 반환. {}".format( chord_task.get() ) )
 
print"\n# 4. Task 상태 확인" )
print"Subtask가 모두 준비되었는가? {}".format( chord_task.ready() ) )
cs

         - 7 ln: 그룹화된 chord task를 생성.

         - 8 ln: 위에서 정의한 callback을 subtask를 통해 결과를 반환받음.

 

   5-2. 실행 확인.

         - chord_task.type이 <class 'celery.result.AsyncResult'>인 것으로 확인됨.

         - Groupd에서 사용했던 successful(), failed(), join() 등의 기능을 사용할 수 없음.

         - Group은 병렬처리가 된 후에 결과값을 리스트로 반환하여 그대로 출력되지만, Chord는 callback에서 후 처리를 할 수 있어서 모두 더한 값을 바로 출력하였음.

 

   5-3. Celery 로그 확인.

         - Group과 마찬가지로 Received가 먼저 되는것으로 보아 Task가 병렬처리되는 것으로 확인됨.

 

 

6. 마치며.

   - 처음에는 Group과 Chord의 차이가 긴가민가해서 잘 이해가 가지않았는데, 정리하면서 보니까 Chord은 callback으로 결과값을 핸들링할 수 있다는 큰 차이를 알게되었다.

   - 마땅히 적용해볼만한 예제가 떠오르지 않는다. 크롤러를 만들어보면서 직접 사용해봐야겠다.

   - 그리고 지금까지 Celery를 쓰면서 로그만 확인했었는데, 모니터링 툴을 한번 써보도록 하겠다.

 

 

참고사이트

- https://docs.celeryproject.org/en/latest/userguide/canvas.html#chord-important-notes

- https://stackoverflow.com/questions/31179336/with-celery-canvas-what-is-the-difference-between-chord-and-chaingroup-ta

댓글