programing

셀러리 작업을 어떻게 단위 테스트합니까?

nasanasas 2020. 8. 25. 08:10
반응형

셀러리 작업을 어떻게 단위 테스트합니까?


Celery 문서 는 Django 내에서 Celery 테스트를 언급 하지만 Django를 사용 하지 않는 경우 Celery 작업을 테스트하는 방법에 대해서는 설명하지 않습니다. 어떻게하나요?


모든 unittest lib를 사용하여 동기식으로 작업을 테스트 할 수 있습니다. 나는 일반적으로 셀러리 작업을 할 때 두 가지 다른 테스트 세션을 수행합니다. 첫 번째는 (내가 제안한 것처럼) 완전히 동 기적이며 알고리즘이해야 할 일을 수행하는지 확인하는 것이어야합니다. 두 번째 세션은 전체 시스템 (브로커 포함)을 사용하고 직렬화 문제 나 기타 배포, 통신 문제가 없는지 확인합니다.

그래서:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

그리고 테스트 :

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

도움이 되었기를 바랍니다.


나는 이것을 사용한다 :

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

문서 : http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER를 사용하면 작업을 동기식으로 실행할 수 있으며 셀러리 서버가 필요하지 않습니다.


정확히 무엇을 테스트하고 싶은지에 따라 다릅니다.

  • 태스크 코드를 직접 테스트하십시오. "task.delay (...)"를 호출하지 말고 단위 테스트에서 "task (...)"를 호출하십시오.
  • CELERY_ALWAYS_EAGER를 사용하십시오 . 이렇게하면 "task.delay (...)"라고 말하는 지점에서 작업이 즉시 호출되므로 전체 경로를 테스트 할 수 있습니다 (비동기 동작은 아님).

unittest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test 픽스쳐

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

부록 : send_task를 열심히 존중하도록

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

Celery 4 사용자의 경우 :

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

설정 이름이 변경되었으며 업그레이드를 선택한 경우 업데이트가 필요하므로

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names


As of Celery 3.0, one way to set CELERY_ALWAYS_EAGER in Django is:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

Since Celery v4.0, py.test fixtures are provided to start a celery worker just for the test and are shut down when done:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@gnpill.local (running)>
    assert myfunc.delay().wait(3)

Among other fixtures described on http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, you can change the celery default options by redefining the celery_config fixture this way:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

By default, the test worker uses an in-memory broker and result backend. No need to use a local Redis or RabbitMQ if not testing specific features.


In my case (and I assume many others), all I wanted was to test the inner logic of a task using pytest.

TL;DR; ended up mocking everything away (OPTION 2)


Example Use Case:

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

but, since shared_task decorator does a lot of celery internal logic, it isn't really a unit tests.

So, for me, there were 2 options:

OPTION 1: Separate internal logic

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

This looks very odd, and other than making it less readable, it requires to manually extract and pass attributes that are part of the request, for instance the task_id in case you need it, which make the logic less pure.

OPTION 2: mocks
mocking away celery internals

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

which then allows me to mock the request object (again, in case you need things from the request, like the id, or the retries counter.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

This solution is much more manual, but, it gives me the control I need to actually unit test, without repeating myself, and without losing the celery scope.

참고URL : https://stackoverflow.com/questions/12078667/how-do-you-unit-test-a-celery-task

반응형