웹사이트 검색

Ubuntu VPS에서 작업을 대기시키기 위해 RabbitMQ와 함께 셀러리를 사용하는 방법


소개

비동기 또는 비 차단 처리는 프로그램의 기본 흐름에서 특정 작업의 실행을 분리하는 방법입니다. 이렇게 하면 사용자 대면 코드를 중단 없이 실행할 수 있는 등 여러 가지 이점이 있습니다.

메시지 전달은 프로그램 구성 요소가 통신하고 정보를 교환하는 데 사용할 수 있는 방법입니다. 동기식 또는 비동기식으로 구현될 수 있으며 개별 프로세스가 문제 없이 통신할 수 있습니다. 메시지 대기열은 종종 추가 기능을 구현하고 향상된 성능을 제공하며 완전히 메모리에 상주할 수 있기 때문에 메시지 전달은 이러한 유형의 사용을 위한 기존 데이터베이스의 대안으로 구현되는 경우가 많습니다.

Celery는 비동기 메시지 전달 시스템에 구축된 작업 대기열입니다. 프로그래밍 작업을 덤프할 수 있는 버킷으로 사용할 수 있습니다. 작업을 통과한 프로그램은 계속해서 실행하고 반응적으로 기능할 수 있으며 나중에 셀러리를 폴링하여 계산이 완료되었는지 확인하고 데이터를 검색할 수 있습니다.

셀러리는 Python으로 작성되지만 프로토콜은 모든 언어로 구현될 수 있습니다. 웹후크를 통해 다른 언어와도 작동할 수 있습니다.

프로그램 환경에 작업 대기열을 구현하면 쉽게 작업을 오프로드하고 사용자의 상호 작용을 계속 처리할 수 있습니다. 이는 응용 프로그램의 응답성을 높이고 장기 실행 계산을 수행하는 동안 잠기지 않는 간단한 방법입니다.

이 가이드에서는 Ubuntu 12.04 VPS에서 RabbitMQ를 메시징 시스템으로 사용하여 셀러리 작업 대기열을 설치하고 구현합니다.

구성 요소 설치

셀러리 설치

Celery는 Python으로 작성되었으므로 일반 Python 패키지를 처리하는 것과 동일한 방식으로 설치하기 쉽습니다.

메시징 시스템을 설치하기 위한 가상 환경을 생성하여 Python 패키지를 처리하기 위한 권장 절차를 따를 것입니다. 이것은 우리가 환경을 안정적으로 유지하고 더 큰 시스템에 영향을 미치지 않도록 도와줍니다.

Ubuntu의 기본 리포지토리에서 Python 가상 환경 패키지를 설치합니다.

sudo apt-get update
sudo apt-get install python-virtualenv

시스템을 구현할 메시징 디렉토리를 생성합니다.

mkdir ~/messaging
cd ~/messaging

이제 다음 명령을 사용하여 셀러리를 설치할 수 있는 가상 환경을 만들 수 있습니다.

virtualenv --no-site-packages venv

가상 환경이 구성되면 다음을 입력하여 활성화할 수 있습니다.

source venv/bin/activate

위에서 만든 가상 환경에서 현재 작업하고 있음을 반영하도록 프롬프트가 변경됩니다. 이렇게 하면 Python 패키지가 전역이 아닌 로컬로 설치됩니다.

언제든지 환경을 비활성화해야 하는 경우(지금은 아님) 다음을 입력할 수 있습니다.

deactivate

이제 환경을 활성화했으므로 pip를 사용하여 셀러리를 설치할 수 있습니다.

pip install celery

RabbitMQ 설치

Celery는 외부 소스의 요청을 처리하기 위해 메시징 에이전트가 필요합니다. 이 에이전트를 "브로커\라고 합니다.

관계형 데이터베이스, NoSQL 데이터베이스, 키-값 저장소 및 실제 메시징 시스템을 포함하여 브로커가 선택할 수 있는 몇 가지 옵션이 있습니다.

강력하고 안정적인 성능을 제공하고 셀러리와 잘 상호 작용하는 RabbitMQ 메시징 시스템을 사용하도록 셀러리를 구성할 것입니다. 우리의 의도된 용도와 잘 맞는 기능을 포함하고 있기 때문에 훌륭한 솔루션입니다.

Ubuntu의 리포지토리를 통해 RabbitMQ를 설치할 수 있습니다.

sudo apt-get install rabbitmq-server

RabbitMQ 서비스는 설치 시 서버에서 자동으로 시작됩니다.

셀러리 인스턴스 생성

셀러리의 작업 대기열 기능을 사용하려면 설치 후 첫 번째 단계는 셀러리 인스턴스를 생성하는 것입니다. 이것은 패키지를 가져오고 "앱\을 만든 다음 셀러리가 백그라운드에서 실행할 수 있는 작업을 설정하는 간단한 프로세스입니다.

작업자가 수행할 수 있는 작업을 정의할 수 있는 tasks.py라는 메시징 디렉터리 내에 Python 스크립트를 생성해 보겠습니다.

sudo nano ~/messaging/tasks.py

가장 먼저 해야 할 일은 celery 패키지에서 Celery 함수를 가져오는 것입니다.

from celery import Celery

그런 다음 기본 RabbitMQ 서비스에 연결하는 셀러리 애플리케이션 인스턴스를 만들 수 있습니다.

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

Celery 함수의 첫 번째 인수는 작업을 식별하기 위해 작업 앞에 추가되는 이름입니다.

backend 매개변수는 백그라운드 작업의 상태를 쿼리하거나 그 결과를 검색하려는 경우에 필요한 선택적 매개변수입니다.

작업이 프로그램에서 사용할 유용한 값을 반환하지 않고 일부 작업을 수행한 다음 종료하는 단순한 함수인 경우 이 매개 변수를 생략할 수 있습니다. 일부 작업에만 이 기능이 필요한 경우 여기에서 활성화하면 나중에 사례별로 비활성화할 수 있습니다.

broker 매개변수는 브로커에 연결하는 데 필요한 URL을 지정합니다. 우리의 경우 이것은 서버에서 실행 중인 RabbitMQ 서비스입니다. RabbitMQ는 "amqp\라는 프로토콜을 사용하여 작동합니다. RabbitMQ가 기본 구성으로 작동하는 경우 셀러리는 amqp:// 스키마 이외의 다른 정보로는 연결할 수 없습니다.

셀러리 작업 빌드

여전히 이 파일에 작업을 추가해야 합니다.

각 셀러리 작업은 데코레이터 @app.task로 시작해야 합니다. 이를 통해 셀러리는 대기열 기능을 추가할 수 있는 기능을 식별할 수 있습니다. 각 데코레이터 다음에 작업자가 실행할 수 있는 함수를 생성하기만 하면 됩니다.

첫 번째 작업은 문자열을 콘솔에 출력하는 간단한 함수입니다.

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task
def print_hello():
    print 'hello there'

이 함수는 유용한 정보를 반환하지 않기 때문에(대신 콘솔에 출력함) 이 작업에 대한 상태 정보를 저장하기 위해 백엔드를 사용하지 않도록 셀러리에게 지시할 수 있습니다. 내부적으로는 덜 복잡하고 더 적은 리소스가 필요합니다.

<예비>

app=Celery('작업', 백엔드='amqp', 브로커='amqp://')

@app.task(ignore_result=True)

다음으로 소수를 생성하는 다른 함수를 추가합니다(RosettaCode에서 가져옴). 이는 장기 실행 프로세스일 수 있으므로 결과를 기다리고 있을 때 비동기 작업자 프로세스를 처리하는 방법에 대한 좋은 예입니다.

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

@app.task
def gen_prime(x):
    multiples = []
    results = []
    for i in xrange(2, x+1):
        if i not in multiples:
            results.append(i)
            for j in xrange(i*i, x+1, i):
                multiples.append(j)
    return results

우리는 이 함수의 반환 값에 관심이 있고 언제 완료되었는지 알고 싶기 때문에(결과 등을 사용할 수 있도록) ignore_result 매개변수를 추가하지 않습니다. 이 두 번째 작업.

파일을 저장하고 닫습니다.

셀러리 작업자 프로세스 시작

이제 응용 프로그램의 연결을 수락할 수 있는 작업자 프로세스를 시작할 수 있습니다. 수행할 수 있는 작업에 대해 알아보기 위해 방금 만든 파일을 사용합니다.

작업자 인스턴스를 시작하는 것은 celery 명령으로 애플리케이션 이름을 호출하는 것만큼 쉽습니다. 작업자 프로세스를 백그라운드에 배치하기 위해 문자열 끝에 "&” 문자를 포함합니다.

celery worker -A tasks &

이렇게 하면 응용 프로그램이 시작된 다음 터미널에서 분리되어 다른 작업에 계속 사용할 수 있습니다.

여러 작업자를 시작하려면 -n 인수를 사용하여 각각의 이름을 지정하면 됩니다.

celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &

%h는 작업자 이름이 지정될 때 호스트 이름으로 대체됩니다.

작업자를 중지하려면 kill 명령을 사용할 수 있습니다. 프로세스 ID를 쿼리한 다음 이 정보를 기반으로 작업자를 제거할 수 있습니다.

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

이렇게 하면 작업자가 종료하기 전에 현재 작업을 완료할 수 있습니다.

작업자가 작업을 완료할 때까지 기다리지 않고 모든 작업자를 종료하려면 다음을 실행할 수 있습니다.

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

대기열을 사용하여 작업 처리

프로그램의 백그라운드에서 작업을 완료하기 위해 생성한 작업자 프로세스를 사용할 수 있습니다.

이것이 어떻게 작동하는지 보여주기 위해 전체 프로그램을 만드는 대신 Python 인터프리터에서 다양한 옵션을 탐색합니다.

python

프롬프트에서 함수를 환경으로 가져올 수 있습니다.

from tasks import print_hello
from tasks import gen_prime

이러한 기능을 테스트하면 특별한 기능이 없는 것으로 나타납니다. 첫 번째 함수는 예상대로 한 줄을 인쇄합니다.

print_hello()
hello there

두 번째 함수는 소수 목록을 반환합니다.

primes = gen_prime(1000)
print primes

두 번째 함수에 확인할 수 있는 더 큰 범위의 숫자를 지정하면 다음을 계산하는 동안 실행이 중단됩니다.

primes = gen_prime(50000)

"CTRL-C\를 입력하여 실행을 중지합니다. 이 프로세스는 분명히 백그라운드에서 컴퓨팅되지 않습니다.

백그라운드 작업자에 액세스하려면 .delay 메서드를 사용해야 합니다. Celery는 기능을 추가 기능으로 래핑합니다. 이 메서드는 실행할 작업자에게 함수를 전달하는 데 사용됩니다. 즉시 반환되어야 합니다.

primes = gen_prime.delay(50000)

이 작업은 이제 이전에 시작한 작업자에 의해 실행되고 있습니다. 애플리케이션에 대한 backend 매개변수를 구성했기 때문에 계산 상태를 확인하고 결과에 액세스할 수 있습니다.

작업이 완료되었는지 확인하려면 .ready 메서드를 사용할 수 있습니다.

primes.ready()
False

"False\ 값은 작업이 아직 실행 중이고 아직 결과를 사용할 수 없음을 의미합니다. "True\ 값을 얻으면 답으로 무언가를 할 수 있습니다.

primes.ready()
True

.get 메서드를 사용하여 값을 가져올 수 있습니다.

값이 .ready 메서드로 계산되었음을 이미 확인한 경우 다음과 같이 해당 메서드를 사용할 수 있습니다.

print primes.get()
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .

그러나 .get을 호출하기 전에 .ready 메서드를 사용하지 않은 경우 프로그램이 실행되지 않도록 "timeout\ 옵션을 추가하는 것이 좋습니다. t는 결과를 기다려야 하므로 구현 목적을 무산시킵니다.

print primes.get(timeout=2)

시간이 초과되면 프로그램에서 처리할 수 있는 예외가 발생합니다.

결론

이것은 프로그램 내에서 셀러리를 사용하기 시작하기에 충분한 정보이지만 이 라이브러리의 전체 기능에 대한 표면적인 정보일 뿐입니다. Celery를 사용하면 백그라운드 작업을 함께 묶고, 작업을 그룹화하고, 흥미로운 방식으로 기능을 결합할 수 있습니다.

셀러리는 Python으로 작성되었지만 webhook을 통해 다른 언어와 함께 사용할 수 있습니다. 따라서 선택한 언어에 관계없이 백그라운드로 작업을 이동하는 데 매우 유연합니다.

저스틴 엘링우드