Source code for orca.celery

"""Celery application configuration for the ORCA distributed pipeline.

This module initializes the Celery application with broker and backend
settings, configures task queues for different workload types (default,
cosmology, bandpass, imaging), and sets up task routing to direct specific
tasks to appropriate queues.

The Celery app is configured to:
- Use JSON serialization for tasks
- Limit worker prefetch to 1 task at a time
- Restart workers after 20 tasks to prevent memory leaks
- Expire results after 2 hours

Queues
------
default
    General-purpose queue for most tasks.
cosmology
    Queue for cosmology-specific processing tasks.
bandpass
    Queue for bandpass calibration tasks.
imaging
    Queue for imaging pipeline tasks.
calim00 .. calim10
    Per-node queues for NVMe-local subband processing.  Each lwacalimNN
    worker listens on its own ``calimNN`` queue so that subband data stays
    on the node's local NVMe.  Tasks are routed dynamically at submit time
    via ``task.apply_async(queue='calim08')``.

Example
-------
Start a worker for the imaging queue::

    celery -A orca.celery worker -Q imaging -c 4

Start a worker on lwacalim08 for subband processing::

    celery -A orca.celery worker -Q calim08 --hostname=calim08@lwacalim08 -c 4

"""
# orca/celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery
from orca.configmanager import queue_config

# You'll need these if you define custom Queues/Exchanges
from kombu import Queue, Exchange

[docs] CELERY_APP_NAME = 'orca'
[docs] app = Celery( CELERY_APP_NAME, broker=queue_config.broker_uri, backend=queue_config.result_backend_uri, include=[ 'orca.transform.calibration', 'orca.transform.qa', 'orca.tasks.fortests', 'orca.transform.spectrum', 'orca.transform.spectrum_v2', 'orca.transform.spectrum_v3', 'orca.transform.imaging', 'orca.transform.photometry', 'orca.tasks.pipeline_tasks', 'orca.tasks.imaging_tasks', 'orca.tasks.subband_tasks', #'orca.tasks.peel_stage1_tasks', ] )
# Basic configs app.conf.update( result_expires=7200, worker_prefetch_multiplier=1, worker_max_tasks_per_child=20, task_serializer='json', broker_connection_retry=True, broker_connection_retry_on_startup=True, ) ###################### # Define your QUEUES ###################### # Per-node queues for NVMe-local subband processing. # Each lwacalimNN worker listens on its own calimNN queue so that # subband data stays on the node's local NVMe. _calim_queues = tuple( Queue(f'calim{i:02d}', Exchange(f'calim{i:02d}'), routing_key=f'calim{i:02d}') for i in range(11) # calim00 .. calim10 ) app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('cosmology', Exchange('cosmology'), routing_key='cosmology'), Queue('bandpass', Exchange('bandpass'), routing_key='bandpass'), Queue('imaging', Exchange('imaging'), routing_key='imaging'), ) + _calim_queues # If you still want "default" to be the fallback for any tasks not explicitly routed app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'default' app.conf.task_default_routing_key = 'default' ################### # TASK ROUTING ################### app.conf.task_routes = { # All pipeline tasks can stay on default, *except* the special one(s): # # Example: route the new cosmology tasks to "cosmology" queue 'orca.tasks.pipeline_tasks.split_2pol_task': {'queue': 'cosmology'}, # If you have other tasks you want, list them here # # e.g. 'orca.tasks.pipeline_tasks.flag_foo_task': {'queue': 'cosmology'}, 'orca.tasks.pipeline_tasks.bandpass_nvme_task': {'queue': 'bandpass'}, 'orca.tasks.imaging_tasks.imaging_pipeline_task': {'queue': 'imaging'}, 'orca.tasks.imaging_tasks.imaging_shared_pipeline_task': {'queue': 'imaging'}, # Subband tasks: queue is set dynamically at submit time via .set(queue=...) # so they are NOT statically routed here. The submission script uses # orca.resources.subband_config.get_queue_for_subband() to pick the right # calimNN queue. See orca/tasks/subband_tasks.py for details. } if __name__ == '__main__': app.start()