2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행
지난 포스트에서 Celery를 설치하고 간단하게 Task를 비동기로 실행해보았다. Celery가 비동기 태스크 큐이지만 동기적으로 Task를 실행할 수도 있고, 이전 Task의 결과를 다음 Task의 인자로 전달하여 실행하는 등 다양한 처리 방법들을 제공한다. 그래서 이번 포스트에서는 지금까지 찾아본 Task를 실행하는 방법들에 대해서 알아보도록 하겠다.
1. Task를 실행하는 방법.
1-1. apply_async 또는 delay를 이용하는 방법.
- 호출과 동시에 Task가 실행됨.
- delay는 apply_async를 사용하여 만든 shortcut이며, Arguments만 전달할 수 있고 옵션은 설정할 수 없음.
- apply_async는 Arguments와 countdown, quque, ingnore_result 등 다양한 옵션을 설정할 수 있음.
1-2. subtask 또는 s 를 이용하는 방법.
- 대기 상태의 Task를 생성하고, appy_async 또는 delay로 호출하여 실행할 수 있음.
- 또한, Task를 생성하거나 실행할 때, 선택적으로 파라미터를 줄 수 있으며 재사용이 가능함.
- s는 subtask의 shortcut으로 delay와 비슷함.
- subtask는 apply_async와 비슷함.
1-3. 더 자세한 내용은 공식 문서를 참고.
2. 패키지 구성.
- [ dochi_app.py ]: Celery App을 생성하는 파일.
- [ run.py ]: Task를 실행시킬 파일.
- [ tasks ]: Worker의 Task들을 모아 놓은 폴더.
- [ tasks/calc.py ]: Worker의 Task 파일, 간단한 더하기 예제.
- [ tasks/example.py ]: Worker의 Task파일, 이전 포스트에서 다루었던 예제.
3. 예제 소스 작성.
3-1. Celery App 생성.
- dochi_app.py
1
2
3
4
5
6
7
8
9
10
11
|
from celery import Celery
app = Celery(
'my_tasks'
, broker='amqp://heo:heo@localhost:5672//'
, backend='rpc://'
, include=[
'tasks.example'
, 'tasks.calc'
]
)
|
cs |
- 4 ln: Celery App의 이름.
- 5 ln: 처리할 Task를 보관하는 Broker(중계자)를 설정.
- 6 ln: 처리된 결과를 보관하는 Broker를 설정, 이를 생략하면 Task의 실행 결과를 받을 수 없음.
- 7~9 ln: Worker가 처리할 Task를 지정.
3-2. Task 생성.
- tasks/example.py
- 랜덤한 시간 이후에 Task를 종료하는 예제.
1
2
3
4
5
6
7
8
9
10
11
12
|
import time
import random
from dochi_app import app
@app.task
def working( id=1 ):
# 1~5초 사이의 랜덤한 Delay를 발생.
time.sleep( random.randint(1,5) )
return '{}번째, 일을 끝냈다.'.format( id )
|
cs |
- 4 ln: 앞서 생성한 Celery App을 import하여 재사용.
3-3. Task 생성.
- tasks/calc.py
- 간단한 더하기 예제.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import time
from dochi_app import app
@app.task
def add( num1, num2 ):
time.sleep( 1 )
print( "{} + {} = {}".format( num1, num2, num1+num2 ) )
return num1 + num2
@app.task
def callback( results ):
return results
|
cs |
- 3 ln: 앞서 생성한 Celery App을 import하여 재사용.
- 14 ln: 실행 결과를 반환받을 함수.
4. Celery 실행.
4-1. 실행 명령어.
1
2
3
|
celery -A dochi_app worker -l INFO -P solo -c 10
#또는
celery worker --app=dochi_app --loglevel=INFO --pool=solo --concurrency=10
|
cs |
- [ -A | --app ]: Celery App의 파일명.
- [ worker ]: Worker 실행 명령어.
- [ -l | --loglevel ]: Logging 레벨 설정, DEBUG, INFO, WARNING, ERROR, CRITICAL, or FATAL.
- [ -P | --pool ]: 프로세스 처리 방식.
- [ -c | --concurrency ]: 동시에 병행처리 할 수 있는 Task의 수.
4-2. 실행 결과.
5. Task 실행 소스 예제.
5-1. apply_async 와 delay를 사용하여 진행 상황 살펴보기 확인해보기.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# run.py
from tasks import calc
# delay와 apply_async
task_1 = calc.add.delay( 1, 2 )
task_2 = calc.add.apply_async( args=[3, 4], ignore_result=True )
task_3 = calc.add.apply_async( args=[5, 6], kwargs={} )
print( "# 1. Task ID 확인" )
print( "task_1 is {}".format( task_1.id ) )
print( "task_2 is {}".format( task_2.id ) )
print( "task_3 is {}".format( task_3.id ) )
print( "\n# 2. Task 상태" )
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
print( "\n# 3. 실행결과 확인" )
print( "task_1 is {}".format( task_1.get() ) )
print( "task_2 is {}".format( task_2.get() ) )
print( "task_3 is {}".format( task_3.get() ) )
print( "\n# 4. Task 상태" )
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
|
cs |
- 5 ln: shortcut으로 Arguments만 전달하여 Task를 실행.
- 6 ln: Arguments와 옵션을 전달하여 Task를 실행.
* ignore_result는 실행결과의 반환여부, False는 실행결과를 알 수 없음.
- 7 ln: Arguments만 전달하여 Task를 실행.
- 10~12 ln: 각 Task의 ID를 확인할 수 있음.
- 15~17, 24~27 ln: ready()를 통해 현재 Task의 작업 상태를 확인.
* True이면 완료, False이면 진행중이거나 알 수 없음.
- 20~22 ln: get()를 통해 현재 Task의 실행 결과를 확인.
* get을 사용할 경우 Task가 동기적으로 실행됨.
5-2. 실행 결과.
- Task의 ID는 모두 유니크한 값을 가짐.
- Task 상태가 모두 False인 것은 아직 실행중인 상태를 나타냄.
- task_2는 ignore_result옵션을 적용했기 때문에 실행 결과를 알 수 없음.
- task_2는 결과를 반환하지 않기 때문에 기다리지 않고 바로 넘어감.
6. SubTasks 실행 소스 예제.
6-1. subtask와 s를 사용하여 진행 상황 살펴보기.
- 내용이 많아보이지만 반복되는 내용이니까 겁먹지말자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# run.py
from tasks import calc
# s와 subtask
subtask_1 = calc.add.s( 1, 2 )
subtask_2 = calc.add.subtask( args=[3, 4], ignore_result=True )
subtask_3 = calc.add.subtask( args=[5, 6], kwargs={} )
subtask_4 = calc.add.subtask()
print( "# 1. SubTask 확인" )
print( "subtask_1 is {}".format( subtask_1.id ) )
print( "subtask_2 is {}".format( subtask_2.id ) )
print( "subtask_3 is {}".format( subtask_3.id ) )
print( "subtask_4 is {}".format( subtask_4.id ) )
task_1 = subtask_1.delay()
task_2 = subtask_2.apply_async()
task_3 = subtask_3.delay()
task_4 = subtask_4.apply_async( args=[7, 8], ignore_result=True )
task_5 = subtask_4.delay( 9, 10 )
print( "\n# 2. Task ID 확인" )
print( "task_1 is {}".format( task_1 ) )
print( "task_2 is {}".format( task_2 ) )
print( "task_3 is {}".format( task_3 ) )
print( "task_4 is {}".format( task_4 ) )
print( "task_5 is {}".format( task_4 ) )
print( "\n# 3. Task 상태" )
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
print( "task_4 is ready? {}".format( task_4.ready() ) )
print( "task_5 is ready? {}".format( task_5.ready() ) )
print( "\n# 4. 실행결과 확인" )
print( "task_1 is {}".format( task_1.get() ) )
print( "task_2 is {}".format( task_2.get() ) )
print( "task_3 is {}".format( task_3.get() ) )
print( "task_4 is {}".format( task_4.get() ) )
print( "task_5 is {}".format( task_5.get() ) )
print( "\n# 5. Task 상태" )
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
print( "task_4 is ready? {}".format( task_4.ready() ) )
print( "task_5 is ready? {}".format( task_5.ready() ) )
|
cs |
- 5 ln: shortcut으로 Arguments만 정의된 subtask를 생성.
- 6 ln: Arguments와 옵션이 정의된 subtask를 생성.
- 7 ln: Arguments만 정의된 subtask를 생성.
- 8 ln: 빈 subtask를 생성.
- 11~14 ln: subtask의 ID를 출력.
- 16~18 ln: shortcut으로 파라미터 없이 subtask를 실행
* 이미 Arguments를 정의했기 때문에 여기서 추가로 전달할 경우 Append됨.
- 19 ln: 빈 subtask에 Arguments를 전달하여 subtask를 실행.
- 20 ln: 19 ln에서 실행했던 subtask를 재사용하여 실행.
- 나머지는 [ 5-1 예제 ] 와 같음.
6-2. 실행 결과 화면.
- Subtask는 실행되기 전 상태이므로 ID를 가지지않음.
- task_4와 task_5는 같은 subtask를 사용하기 때문에 ID가 같음.
- task_2와 task_4는 ignore_result가 True이기 때문에 실행 결과를 알 수 없음.
7. Subtask를 사용하는 이유.
- 위 예제만 보면 큰 차이점을 못느낄 수도 있는데, 자세히 살펴보면 아주 큰 차이점이 있다.
- apply_async는 호출하는 즉시 실행되고, subtask는 대기 상태였다가 호출이 되면 실행한다.
- 즉, apply_async는 Task가 독립적으로 각각 실행되는 경우에 적합하며, subtask는 연속된 Task나 Task를 일괄처리하는데 적합하다.
- subtask를 처리할 때 chain을 사용하여 연속된 Task를 처리하고, chord를 사용하여 일괄처리 할 수 있다.
8. 마치며
- 이번 포스트에서는 Task를 실행하는 방법에 대해서 알아보았다. 공부하면서 예제를 만들고 부연설명들을 붙이다보니 내용이 좀 길어진 감이 있다. 그래도 Celery에 대해서 많이 알게되었다.
- 다음 포스트에서는 chain과 chord에 대해서 알아보도록 하겠다.
'Back-end > Python' 카테고리의 다른 글
[Celery] 무장적 시장하기 (4) - Group과 Chord (0) | 2020.01.20 |
---|---|
[Celery] 무작정 시작하기 (3) - Chain (0) | 2020.01.20 |
[Selenium] Chrome에서 탭 여러개 사용하기 (4) | 2020.01.16 |
[pynput] 나만의 단축키 만들기 (6) | 2020.01.15 |
[Celery] 무작정 시작하기 (1) - 설치 및 실행 (1) | 2020.01.10 |
댓글