orca.tasks.subband_tasks
Celery tasks for subband processing on NVMe-local calim servers.
Architecture
The processing is split into two phases so that Celery handles the per-MS parallelism in Phase 1, while Phase 2 runs sequentially on the same node after all individual MS files are ready.
- Phase 1 — per-MS (embarrassingly parallel, one Celery task per MS):
copy to NVMe → flag bad antennas → apply calibration → peel (sky + RFI)
- Phase 2 — per-subband (sequential, runs after all Phase 1 tasks finish):
concatenate → fix field ID → chgcentre → AOFlagger → pilot snapshots → snapshot QA → hot baseline removal → science imaging → archive to Lustre
Both phases are routed to the same node-specific queue (e.g. calim08)
so that all I/O stays on the node’s local NVMe.
Usage (from a submission script or notebook):
from celery import chord
from orca.tasks.subband_tasks import prepare_one_ms_task, process_subband_task
from orca.resources.subband_config import get_queue_for_subband
queue = get_queue_for_subband('73MHz')
# Phase 1: parallel per-MS
header_tasks = [
prepare_one_ms_task.s(
src_ms=ms, nvme_work_dir=work_dir,
bp_table=bp, xy_table=xy,
peel_sky=True, peel_rfi=True,
) for ms in ms_files
]
# Phase 2: runs after all Phase 1 tasks complete; receives list of MS paths
pipeline = chord(header_tasks)(
process_subband_task.s(
work_dir=work_dir, subband='73MHz',
lst_label='14h', obs_date='2025-06-15',
run_label='Run_20250615',
)
)
Attributes
Functions
|
Error callback for Phase 1 task failures. |
|
Copy one MS to NVMe, flag, calibrate, and optionally peel. |
|
Phase 2: concatenate, image, run science, and archive one subband. |
|
Submit the full two-phase subband pipeline as a Celery chord. |
Submit multiple LST-hours for one subband as a sequential chain. |
Module Contents
- orca.tasks.subband_tasks.on_chord_error(self, task_id=None, exc=None, traceback_str=None, dynamic_run_label: str | None = None, work_dir: str | None = None, cleanup_nvme: bool = False, subband: str = '', lst_label: str = '')[source]
Error callback for Phase 1 task failures.
Attached via
link_erroron each individual Phase 1 task. When all retries are exhausted for any MS in the chord, Celery fires this callback. The chord itself may still complete (other MS tasks succeed) and Phase 2 runs normally. But if enough tasks fail that the chord never fires Phase 2, this ensures the dynamic dispatch chain continues.A Redis
SETNXlock (60 min TTL) prevents duplicate pops when multiple Phase 1 tasks in the same chord all fail.
- orca.tasks.subband_tasks.prepare_one_ms_task(self, src_ms: str, nvme_work_dir: str, bp_table: str, xy_table: str, peel_sky: bool = False, peel_rfi: bool = False) str[source]
Copy one MS to NVMe, flag, calibrate, and optionally peel.
This is the Phase 1 workhorse. Each MS file gets its own Celery task so they run in parallel across the worker’s concurrency slots.
- Parameters:
src_ms – Source MS path on Lustre.
nvme_work_dir – NVMe working directory (same for all MS in a subband).
bp_table – Bandpass calibration table path.
xy_table – XY-phase calibration table path.
peel_sky – Run TTCal zest with sky model.
peel_rfi – Run TTCal zest with RFI model.
- Returns:
Path to the processed MS on NVMe.
- orca.tasks.subband_tasks.process_subband_task(self, ms_paths: List[str], work_dir: str, subband: str, lst_label: str, obs_date: str, run_label: str, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, targets: List[str] | None = None, catalog: str | None = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, remaining_hours: List[dict] | None = None, dynamic_run_label: str | None = None, bp_table: str | None = None, xy_table: str | None = None) str[source]
Phase 2: concatenate, image, run science, and archive one subband.
This task is used as the callback in a
chord: it receives the list of NVMe MS paths returned by the Phase 1 tasks.When remaining_hours is provided (by
submit_subband_pipeline_chained), this task will submit the next hour’s chord upon successful completion, ensuring hours are processed sequentially on the same node.When dynamic_run_label is set, the task operates in dynamic dispatch mode: instead of chaining to the next hour of the same subband, it pops the next (subband, hour) work unit from a Redis queue. This lets any free node pick up whatever work is available.
- Parameters:
ms_paths – NVMe paths returned by prepare_one_ms_task (via chord).
work_dir – NVMe working directory (same as Phase 1).
subband – Frequency label, e.g. ‘73MHz’.
lst_label – LST label, e.g. ‘14h’.
obs_date – Observation date string ‘YYYY-MM-DD’.
run_label – Pipeline run label.
hot_baselines – Whether to run hot-baseline diagnostics.
skip_cleanup – If True, keep the concat MS on NVMe.
cleanup_nvme – If True, remove the entire NVMe work_dir after archiving.
targets – List of target-list file paths for photometry.
catalog – Path to BDSF catalog for transient search masking.
clean_snapshots – If True, produce CLEANed Stokes-I snapshots in
snapshots_clean/in addition to the dirty pilots insnapshots/. Always fpack-compressed.clean_reduced_pixels – If True, scale clean snapshot pixel count by subband frequency (1024/2048/4096). Only affects clean snapshots, not dirty pilots or science imaging.
skip_science – If True, skip all science phases (dewarping, photometry, transient search, flux check) after PB correction. Products are still archived to Lustre.
compress_snapshots – If True, fpack-compress all snapshot FITS to .fs and remove the originals. Deep images are not compressed.
remaining_hours – List of kwarg dicts for subsequent hours. Each dict contains the arguments for
submit_subband_pipeline. The first entry is submitted after this hour completes, with the rest forwarded as its ownremaining_hours.dynamic_run_label – If set, enables dynamic dispatch mode using this run label as the Redis work-queue key.
- Returns:
Path to the Lustre archive directory with final products.
- orca.tasks.subband_tasks.submit_subband_pipeline(ms_files: List[str], subband: str, bp_table: str, xy_table: str, lst_label: str, obs_date: str, run_label: str, peel_sky: bool = False, peel_rfi: bool = False, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, nvme_work_dir: str | None = None, queue_override: str | None = None, targets: List[str] | None = None, catalog: str | None = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, remaining_hours: List[dict] | None = None, dynamic_run_label: str | None = None) celery.result.AsyncResult[source]
Submit the full two-phase subband pipeline as a Celery chord.
This is the main entry point for the submission script.
- Parameters:
ms_files – List of source MS paths on Lustre.
subband – Frequency label, e.g. ‘73MHz’.
bp_table – Bandpass calibration table.
xy_table – XY-phase calibration table.
lst_label – LST label, e.g. ‘14h’.
obs_date – Observation date ‘YYYY-MM-DD’.
run_label – Human-readable run identifier.
peel_sky – Peel astronomical sky sources.
peel_rfi – Peel RFI sources.
hot_baselines – Run hot-baseline diagnostics.
skip_cleanup – Keep intermediate files on NVMe.
cleanup_nvme – Remove entire NVMe work_dir after archiving to Lustre.
nvme_work_dir – Override NVMe work directory.
queue_override – Force routing to this queue instead of the default node. E.g. ‘calim08’ to run 18MHz on calim08.
targets – List of target-list file paths for photometry.
catalog – Path to BDSF catalog for transient search masking.
clean_snapshots – If True, produce CLEANed Stokes-I snapshots.
clean_reduced_pixels – Scale clean snapshot pixels by frequency.
reduced_pixels – If True, scale pixel count by subband frequency.
skip_science – If True, skip science phases after PB correction.
compress_snapshots – If True, fpack-compress snapshot FITS.
dynamic_run_label – If set, enables dynamic dispatch mode.
- Returns:
Celery AsyncResult for the chord (Phase 2 result).
- orca.tasks.subband_tasks.submit_subband_pipeline_chained(hour_specs: List[dict], subband: str, bp_table: str, xy_table: str, run_label: str, peel_sky: bool = False, peel_rfi: bool = False, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, queue_override: str | None = None, targets: List[str] | None = None, catalog: str | None = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False) celery.result.AsyncResult[source]
Submit multiple LST-hours for one subband as a sequential chain.
Instead of submitting all hours simultaneously (which floods the worker with Phase 1 tasks and starves Phase 2), this function submits only the first hour immediately. Phase 2 of each hour triggers the next hour’s chord upon completion, ensuring:
Phase 2 (imaging) runs on an idle node with full CPU/memory
NVMe space is freed before the next hour’s data arrives
No resource contention between hours on the same node
Different subbands still run in parallel on different nodes.
- Parameters:
hour_specs – List of dicts, each with keys: -
ms_files: List of source MS paths for that hour. -lst_label: e.g. ‘14h’. -obs_date: e.g. ‘2025-06-15’.subband – Frequency label, e.g. ‘73MHz’.
bp_table – Bandpass calibration table.
xy_table – XY-phase calibration table.
run_label – Human-readable run identifier.
peel_sky – Peel astronomical sky sources.
peel_rfi – Peel RFI sources.
hot_baselines – Run hot-baseline diagnostics.
skip_cleanup – Keep intermediate files on NVMe.
cleanup_nvme – Remove entire NVMe work_dir after archiving.
queue_override – Force routing to this queue.
targets – Target-list file paths for photometry.
catalog – BDSF catalog for transient search masking.
clean_snapshots – Produce CLEANed Stokes-I snapshots.
clean_reduced_pixels – Scale clean snapshot pixels by frequency.
reduced_pixels – Scale pixel count by subband frequency.
skip_science – Skip science phases after PB correction.
compress_snapshots – fpack-compress snapshot FITS.
- Returns:
Celery AsyncResult for the first hour’s chord (only the first hour is submitted immediately; subsequent hours are triggered by Phase 2 callbacks).