orca.tasks.subband_tasks

Celery tasks for subband processing on NVMe-local calim servers.

Architecture

The processing is split into two phases so that Celery handles the per-MS parallelism in Phase 1, while Phase 2 runs sequentially on the same node after all individual MS files are ready.

Phase 1 — per-MS (embarrassingly parallel, one Celery task per MS):

copy to NVMe → flag bad antennas → apply calibration → peel (sky + RFI)

Phase 2 — per-subband (sequential, runs after all Phase 1 tasks finish):

concatenate → fix field ID → chgcentre → AOFlagger → pilot snapshots → snapshot QA → hot baseline removal → science imaging → archive to Lustre

Both phases are routed to the same node-specific queue (e.g. calim08) so that all I/O stays on the node’s local NVMe.

Usage (from a submission script or notebook):

from celery import chord
from orca.tasks.subband_tasks import prepare_one_ms_task, process_subband_task
from orca.resources.subband_config import get_queue_for_subband

queue = get_queue_for_subband('73MHz')

# Phase 1: parallel per-MS
header_tasks = [
    prepare_one_ms_task.s(
        src_ms=ms, nvme_work_dir=work_dir,
        bp_table=bp, xy_table=xy,
        peel_sky=True, peel_rfi=True,
    ) for ms in ms_files
]

# Phase 2: runs after all Phase 1 tasks complete; receives list of MS paths
pipeline = chord(header_tasks)(
    process_subband_task.s(
        work_dir=work_dir, subband='73MHz',
        lst_label='14h', obs_date='2025-06-15',
        run_label='Run_20250615',
    )
)

Attributes

animation

logger

DYNAMIC_WORK_QUEUE

Functions

on_chord_error(self[, task_id, exc, traceback_str, ...])

Error callback for Phase 1 task failures.

prepare_one_ms_task(→ str)

Copy one MS to NVMe, flag, calibrate, and optionally peel.

process_subband_task(→ str)

Phase 2: concatenate, image, run science, and archive one subband.

submit_subband_pipeline(→ celery.result.AsyncResult)

Submit the full two-phase subband pipeline as a Celery chord.

submit_subband_pipeline_chained(...)

Submit multiple LST-hours for one subband as a sequential chain.

Module Contents

orca.tasks.subband_tasks.animation = None[source]
orca.tasks.subband_tasks.logger[source]
orca.tasks.subband_tasks.DYNAMIC_WORK_QUEUE = 'pipeline:dynamic:{run_label}'[source]
orca.tasks.subband_tasks.on_chord_error(self, task_id=None, exc=None, traceback_str=None, dynamic_run_label: str | None = None, work_dir: str | None = None, cleanup_nvme: bool = False, subband: str = '', lst_label: str = '')[source]

Error callback for Phase 1 task failures.

Attached via link_error on each individual Phase 1 task. When all retries are exhausted for any MS in the chord, Celery fires this callback. The chord itself may still complete (other MS tasks succeed) and Phase 2 runs normally. But if enough tasks fail that the chord never fires Phase 2, this ensures the dynamic dispatch chain continues.

A Redis SETNX lock (60 min TTL) prevents duplicate pops when multiple Phase 1 tasks in the same chord all fail.

orca.tasks.subband_tasks.prepare_one_ms_task(self, src_ms: str, nvme_work_dir: str, bp_table: str, xy_table: str, peel_sky: bool = False, peel_rfi: bool = False) str[source]

Copy one MS to NVMe, flag, calibrate, and optionally peel.

This is the Phase 1 workhorse. Each MS file gets its own Celery task so they run in parallel across the worker’s concurrency slots.

Parameters:
  • src_ms – Source MS path on Lustre.

  • nvme_work_dir – NVMe working directory (same for all MS in a subband).

  • bp_table – Bandpass calibration table path.

  • xy_table – XY-phase calibration table path.

  • peel_sky – Run TTCal zest with sky model.

  • peel_rfi – Run TTCal zest with RFI model.

Returns:

Path to the processed MS on NVMe.

orca.tasks.subband_tasks.process_subband_task(self, ms_paths: List[str], work_dir: str, subband: str, lst_label: str, obs_date: str, run_label: str, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, targets: List[str] | None = None, catalog: str | None = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, remaining_hours: List[dict] | None = None, dynamic_run_label: str | None = None, bp_table: str | None = None, xy_table: str | None = None) str[source]

Phase 2: concatenate, image, run science, and archive one subband.

This task is used as the callback in a chord: it receives the list of NVMe MS paths returned by the Phase 1 tasks.

When remaining_hours is provided (by submit_subband_pipeline_chained), this task will submit the next hour’s chord upon successful completion, ensuring hours are processed sequentially on the same node.

When dynamic_run_label is set, the task operates in dynamic dispatch mode: instead of chaining to the next hour of the same subband, it pops the next (subband, hour) work unit from a Redis queue. This lets any free node pick up whatever work is available.

Parameters:
  • ms_paths – NVMe paths returned by prepare_one_ms_task (via chord).

  • work_dir – NVMe working directory (same as Phase 1).

  • subband – Frequency label, e.g. ‘73MHz’.

  • lst_label – LST label, e.g. ‘14h’.

  • obs_date – Observation date string ‘YYYY-MM-DD’.

  • run_label – Pipeline run label.

  • hot_baselines – Whether to run hot-baseline diagnostics.

  • skip_cleanup – If True, keep the concat MS on NVMe.

  • cleanup_nvme – If True, remove the entire NVMe work_dir after archiving.

  • targets – List of target-list file paths for photometry.

  • catalog – Path to BDSF catalog for transient search masking.

  • clean_snapshots – If True, produce CLEANed Stokes-I snapshots in snapshots_clean/ in addition to the dirty pilots in snapshots/. Always fpack-compressed.

  • clean_reduced_pixels – If True, scale clean snapshot pixel count by subband frequency (1024/2048/4096). Only affects clean snapshots, not dirty pilots or science imaging.

  • skip_science – If True, skip all science phases (dewarping, photometry, transient search, flux check) after PB correction. Products are still archived to Lustre.

  • compress_snapshots – If True, fpack-compress all snapshot FITS to .fs and remove the originals. Deep images are not compressed.

  • remaining_hours – List of kwarg dicts for subsequent hours. Each dict contains the arguments for submit_subband_pipeline. The first entry is submitted after this hour completes, with the rest forwarded as its own remaining_hours.

  • dynamic_run_label – If set, enables dynamic dispatch mode using this run label as the Redis work-queue key.

Returns:

Path to the Lustre archive directory with final products.

orca.tasks.subband_tasks.submit_subband_pipeline(ms_files: List[str], subband: str, bp_table: str, xy_table: str, lst_label: str, obs_date: str, run_label: str, peel_sky: bool = False, peel_rfi: bool = False, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, nvme_work_dir: str | None = None, queue_override: str | None = None, targets: List[str] | None = None, catalog: str | None = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, remaining_hours: List[dict] | None = None, dynamic_run_label: str | None = None) celery.result.AsyncResult[source]

Submit the full two-phase subband pipeline as a Celery chord.

This is the main entry point for the submission script.

Parameters:
  • ms_files – List of source MS paths on Lustre.

  • subband – Frequency label, e.g. ‘73MHz’.

  • bp_table – Bandpass calibration table.

  • xy_table – XY-phase calibration table.

  • lst_label – LST label, e.g. ‘14h’.

  • obs_date – Observation date ‘YYYY-MM-DD’.

  • run_label – Human-readable run identifier.

  • peel_sky – Peel astronomical sky sources.

  • peel_rfi – Peel RFI sources.

  • hot_baselines – Run hot-baseline diagnostics.

  • skip_cleanup – Keep intermediate files on NVMe.

  • cleanup_nvme – Remove entire NVMe work_dir after archiving to Lustre.

  • nvme_work_dir – Override NVMe work directory.

  • queue_override – Force routing to this queue instead of the default node. E.g. ‘calim08’ to run 18MHz on calim08.

  • targets – List of target-list file paths for photometry.

  • catalog – Path to BDSF catalog for transient search masking.

  • clean_snapshots – If True, produce CLEANed Stokes-I snapshots.

  • clean_reduced_pixels – Scale clean snapshot pixels by frequency.

  • reduced_pixels – If True, scale pixel count by subband frequency.

  • skip_science – If True, skip science phases after PB correction.

  • compress_snapshots – If True, fpack-compress snapshot FITS.

  • dynamic_run_label – If set, enables dynamic dispatch mode.

Returns:

Celery AsyncResult for the chord (Phase 2 result).

orca.tasks.subband_tasks.submit_subband_pipeline_chained(hour_specs: List[dict], subband: str, bp_table: str, xy_table: str, run_label: str, peel_sky: bool = False, peel_rfi: bool = False, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, queue_override: str | None = None, targets: List[str] | None = None, catalog: str | None = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False) celery.result.AsyncResult[source]

Submit multiple LST-hours for one subband as a sequential chain.

Instead of submitting all hours simultaneously (which floods the worker with Phase 1 tasks and starves Phase 2), this function submits only the first hour immediately. Phase 2 of each hour triggers the next hour’s chord upon completion, ensuring:

  • Phase 2 (imaging) runs on an idle node with full CPU/memory

  • NVMe space is freed before the next hour’s data arrives

  • No resource contention between hours on the same node

Different subbands still run in parallel on different nodes.

Parameters:
  • hour_specs – List of dicts, each with keys: - ms_files: List of source MS paths for that hour. - lst_label: e.g. ‘14h’. - obs_date: e.g. ‘2025-06-15’.

  • subband – Frequency label, e.g. ‘73MHz’.

  • bp_table – Bandpass calibration table.

  • xy_table – XY-phase calibration table.

  • run_label – Human-readable run identifier.

  • peel_sky – Peel astronomical sky sources.

  • peel_rfi – Peel RFI sources.

  • hot_baselines – Run hot-baseline diagnostics.

  • skip_cleanup – Keep intermediate files on NVMe.

  • cleanup_nvme – Remove entire NVMe work_dir after archiving.

  • queue_override – Force routing to this queue.

  • targets – Target-list file paths for photometry.

  • catalog – BDSF catalog for transient search masking.

  • clean_snapshots – Produce CLEANed Stokes-I snapshots.

  • clean_reduced_pixels – Scale clean snapshot pixels by frequency.

  • reduced_pixels – Scale pixel count by subband frequency.

  • skip_science – Skip science phases after PB correction.

  • compress_snapshots – fpack-compress snapshot FITS.

Returns:

Celery AsyncResult for the first hour’s chord (only the first hour is submitted immediately; subsequent hours are triggered by Phase 2 callbacks).