본문 바로가기
Back-end/Python

[Celery] 무작정 시작하기 (1) - 설치 및 실행

by 허도치 2020. 1. 10.

  Scrapy+Selenium를 이용하여 크롤링을 하다가 오래 걸리는 한 녀석 때문에 다른 작업들이 밀려서 실시간으로 데이터를 가져오지 못하는 경우가 발생했다. 그래서 파이썬에서 비동기처리를 할 수 있는 방법에 대해서 찾아보다가 Scrapyd와 Celery에 대해서 알게되었다.

 

  Scrapyd는 API를 통해 Spider를 비동기적으로 호출하고 작업을 취소할 수도 있고, 사용법도 간단하여 아주 유용하게 사용한 녀석이다. 그리고 Celery는 비동기 태스크 큐이며 일련의 작업들을 큐에 담아 멀티태스킹 처리하는 방식이다. 여기에 Spider를 실행시키는 작업을 담으면 비동기처리가 가능하게 되고, 추가로 다른 작업들도 비동기 처리를 할 수 있다. 그래서 동기적으로 수행되는 웹에서 파일을 변환하여 저장하거나, 파일업로드 등 무거운 작업들을 수행할 때 Celery가 자주 많이 사용된다고 한다.

 

 

 

 

1. Celery란?

   1-1. 비동기 작업 큐이며, 스케쥴링이 가능하지만 실시간 처리에 중점을 두고 있음.

   1-2. 동기/비동기 처리가 가능.

   1-3. 작업단위를 Task, 작업자를 Worker라고 함.

   1-4. 메시지 브로커를 사용함.

         - 주로 RabbitMQRedis를 사용하는데, RabbitMQ와 궁합이 가장 잘 맞는다고 알려져있음.

 

 

2. Celery 설치.

   2-1. Celery 라이브러리 설치.

         > pip install celery

 

   2-2. Celery 설치 확인.

         > celery --version

 

   2-3. RabbitMQ 설치.

         - 이전에 다루었던 [RabbitMQ] 무작정시작하기 (1) - 설치 및 실행 참조.

 

[RabbitMQ ] 무작정 시작하기 (1) - 설치 및 실행

이 포스트는 Windows10 기반으로 작성되었음. 메시지 브로커의 하나이며 메시지 브로커에 대한 설명은 [Message Broker란?] 포스트를 참조하길 바란다. RabbitMQ는 여러 측면에서 사용되지만 필자는 Python에서 비..

heodolf.tistory.com

 

 

3. Celery 실행.

   * RabbitMQ Server가 실행중임을 가정함.

 

   3-1. Celery App 작성.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# dochi.py
import time
import random 
 
from celery import Celery
 
app = Celery('tasks', broker='amqp://heo:heo@localhost:5672//')
 
@app.task
def working( id=1 ):
 
    # 1~5초 사이의 랜덤한 Delay를 발생.
    time.sleep( random.randint(1,5) )
    
    return '{}번째, 일을 끝냈다.'.format( id )
cs

         - 5 ln: Celery 라이브러리.

         - 7 ln: 첫번째 인자값은 Celery의 이름, 이후는 옵션임. borker는 RabbitMQ를 사용.

         - 9 ln: Celery의 작업(task)를 설정하는 데코레이터.

         - 10~15 ln: 1~5초의 딜레이를 가지는 Task.

 

   3-2. Celery App 실행.

         > celery -A dochi worker --loglevel=info --pool=solo

         또는 > celery worker --app dochi  --loglevel=info --pool=solo

 

         * pool 옵션은 prefork가 기본인데 solo 또는 threads가 있고, 그 외에 gevent, eventlet이 있음.

         * Windows에서는 prefork가 오류가 발생해서 solo나 threads를 사용.

 

   3-3. Celery App 실행화면.

 

 

4. Celery 비동기 Task 수행.

   4-1. 새로운 명령 프롬프트 실행.

         - 위에서 작성한 파일이 있는 경로로 이동.

         - python 실행.

 

   4-2. Celery 비동기 Task 수행.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import dochi
 
type( dochi.working )
# <class 'celery.local.PromiseProxy'>
 
dochi.working( 1 )
# 1번째, 일을 끝냈다.
 
dochi.working.delay( 1 )
# <AsyncResult: 304d80d7-c256-49c2-af60-14187e7dace4>
 
dochi.working.delay( 2 )
# <AsyncResult: 71bae65d-4cdc-4d7d-b62d-e114e615e4b0>
 
cs

         - 3 ln: dochi.py 에서 작성한 working 함수의 type을 보면 일반 함수가 아닌 것을 알 수 있음.

         - 6 ln: 그래도 일반 함수처럼 사용할 수 있음. 1~5초 사이의 지연시간 뒤에 결과값이 출력됨.

         - 6, 9 ln: 비동기 Task를 수행, 직접 실행해보면 지연시간없이 두개의 라인 실행됨.

 

   4-3. Celery Task 로그 확인.

         - 반복문으로 총 10개의 Task를 실행하였는데, successed된 순서를보면 입력된 순서가 아닌것을 확인할 수 있음.

 

 

 

5. 마치며.

   - 이것으로 간단하게 Celery에 대햏서 알아보고 설치와 실행까지 해보았다. 크게 어려울것은 없어보이는데, 언제나 환경에 맞게 설정을 해주는게 일인것 같다.

   - Celery App을 생성할 때, broker 옵션만 사용하였는데 이런 경우에는 Task의 결과값을 반환받을 수 없다. 그러므로 backend 옵션을 추가하여야 하는데 이는 다음 포스트에서 다루도록 하겠다.

 

 

참고사이트

- https://docs.celeryproject.org/en/latest/userguide/tasks.html#result-backends

댓글