orca.tasks.subband_tasks ======================== .. py:module:: orca.tasks.subband_tasks .. autoapi-nested-parse:: Celery tasks for subband processing on NVMe-local calim servers. Architecture ------------ The processing is split into two phases so that Celery handles the per-MS parallelism in Phase 1, while Phase 2 runs sequentially on the same node after all individual MS files are ready. Phase 1 — per-MS (embarrassingly parallel, one Celery task per MS): copy to NVMe → flag bad antennas → apply calibration → peel (sky + RFI) Phase 2 — per-subband (sequential, runs after all Phase 1 tasks finish): concatenate → fix field ID → chgcentre → AOFlagger → pilot snapshots → snapshot QA → hot baseline removal → science imaging → archive to Lustre Both phases are routed to the **same node-specific queue** (e.g. ``calim08``) so that all I/O stays on the node's local NVMe. Usage (from a submission script or notebook):: from celery import chord from orca.tasks.subband_tasks import prepare_one_ms_task, process_subband_task from orca.resources.subband_config import get_queue_for_subband queue = get_queue_for_subband('73MHz') # Phase 1: parallel per-MS header_tasks = [ prepare_one_ms_task.s( src_ms=ms, nvme_work_dir=work_dir, bp_table=bp, xy_table=xy, peel_sky=True, peel_rfi=True, ) for ms in ms_files ] # Phase 2: runs after all Phase 1 tasks complete; receives list of MS paths pipeline = chord(header_tasks)( process_subband_task.s( work_dir=work_dir, subband='73MHz', lst_label='14h', obs_date='2025-06-15', run_label='Run_20250615', ) ) Attributes ---------- .. autoapisummary:: orca.tasks.subband_tasks.animation orca.tasks.subband_tasks.logger orca.tasks.subband_tasks.DYNAMIC_WORK_QUEUE Functions --------- .. autoapisummary:: orca.tasks.subband_tasks.on_chord_error orca.tasks.subband_tasks.prepare_one_ms_task orca.tasks.subband_tasks.process_subband_task orca.tasks.subband_tasks.submit_subband_pipeline orca.tasks.subband_tasks.submit_subband_pipeline_chained Module Contents --------------- .. py:data:: animation :value: None .. py:data:: logger .. py:data:: DYNAMIC_WORK_QUEUE :value: 'pipeline:dynamic:{run_label}' .. py:function:: on_chord_error(self, task_id=None, exc=None, traceback_str=None, dynamic_run_label: Optional[str] = None, work_dir: Optional[str] = None, cleanup_nvme: bool = False, subband: str = '', lst_label: str = '') Error callback for Phase 1 task failures. Attached via ``link_error`` on each individual Phase 1 task. When all retries are exhausted for *any* MS in the chord, Celery fires this callback. The chord itself may still complete (other MS tasks succeed) and Phase 2 runs normally. But if enough tasks fail that the chord never fires Phase 2, this ensures the dynamic dispatch chain continues. A Redis ``SETNX`` lock (60 min TTL) prevents duplicate pops when multiple Phase 1 tasks in the same chord all fail. .. py:function:: prepare_one_ms_task(self, src_ms: str, nvme_work_dir: str, bp_table: str, xy_table: str, peel_sky: bool = False, peel_rfi: bool = False) -> str Copy one MS to NVMe, flag, calibrate, and optionally peel. This is the Phase 1 workhorse. Each MS file gets its own Celery task so they run in parallel across the worker's concurrency slots. :param src_ms: Source MS path on Lustre. :param nvme_work_dir: NVMe working directory (same for all MS in a subband). :param bp_table: Bandpass calibration table path. :param xy_table: XY-phase calibration table path. :param peel_sky: Run TTCal zest with sky model. :param peel_rfi: Run TTCal zest with RFI model. :returns: Path to the processed MS on NVMe. .. py:function:: process_subband_task(self, ms_paths: List[str], work_dir: str, subband: str, lst_label: str, obs_date: str, run_label: str, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, targets: Optional[List[str]] = None, catalog: Optional[str] = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, remaining_hours: Optional[List[dict]] = None, dynamic_run_label: Optional[str] = None, bp_table: Optional[str] = None, xy_table: Optional[str] = None) -> str Phase 2: concatenate, image, run science, and archive one subband. This task is used as the callback in a ``chord``: it receives the list of NVMe MS paths returned by the Phase 1 tasks. When *remaining_hours* is provided (by ``submit_subband_pipeline_chained``), this task will submit the next hour's chord upon successful completion, ensuring hours are processed **sequentially** on the same node. When *dynamic_run_label* is set, the task operates in **dynamic dispatch** mode: instead of chaining to the next hour of the same subband, it pops the next (subband, hour) work unit from a Redis queue. This lets any free node pick up whatever work is available. :param ms_paths: NVMe paths returned by prepare_one_ms_task (via chord). :param work_dir: NVMe working directory (same as Phase 1). :param subband: Frequency label, e.g. '73MHz'. :param lst_label: LST label, e.g. '14h'. :param obs_date: Observation date string 'YYYY-MM-DD'. :param run_label: Pipeline run label. :param hot_baselines: Whether to run hot-baseline diagnostics. :param skip_cleanup: If True, keep the concat MS on NVMe. :param cleanup_nvme: If True, remove the entire NVMe work_dir after archiving. :param targets: List of target-list file paths for photometry. :param catalog: Path to BDSF catalog for transient search masking. :param clean_snapshots: If True, produce CLEANed Stokes-I snapshots in ``snapshots_clean/`` in addition to the dirty pilots in ``snapshots/``. Always fpack-compressed. :param clean_reduced_pixels: If True, scale clean snapshot pixel count by subband frequency (1024/2048/4096). Only affects clean snapshots, not dirty pilots or science imaging. :param skip_science: If True, skip all science phases (dewarping, photometry, transient search, flux check) after PB correction. Products are still archived to Lustre. :param compress_snapshots: If True, fpack-compress all snapshot FITS to .fs and remove the originals. Deep images are not compressed. :param remaining_hours: List of kwarg dicts for subsequent hours. Each dict contains the arguments for ``submit_subband_pipeline``. The first entry is submitted after this hour completes, with the rest forwarded as its own ``remaining_hours``. :param dynamic_run_label: If set, enables dynamic dispatch mode using this run label as the Redis work-queue key. :returns: Path to the Lustre archive directory with final products. .. py:function:: submit_subband_pipeline(ms_files: List[str], subband: str, bp_table: str, xy_table: str, lst_label: str, obs_date: str, run_label: str, peel_sky: bool = False, peel_rfi: bool = False, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, nvme_work_dir: Optional[str] = None, queue_override: Optional[str] = None, targets: Optional[List[str]] = None, catalog: Optional[str] = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, remaining_hours: Optional[List[dict]] = None, dynamic_run_label: Optional[str] = None) -> celery.result.AsyncResult Submit the full two-phase subband pipeline as a Celery chord. This is the main entry point for the submission script. :param ms_files: List of source MS paths on Lustre. :param subband: Frequency label, e.g. '73MHz'. :param bp_table: Bandpass calibration table. :param xy_table: XY-phase calibration table. :param lst_label: LST label, e.g. '14h'. :param obs_date: Observation date 'YYYY-MM-DD'. :param run_label: Human-readable run identifier. :param peel_sky: Peel astronomical sky sources. :param peel_rfi: Peel RFI sources. :param hot_baselines: Run hot-baseline diagnostics. :param skip_cleanup: Keep intermediate files on NVMe. :param cleanup_nvme: Remove entire NVMe work_dir after archiving to Lustre. :param nvme_work_dir: Override NVMe work directory. :param queue_override: Force routing to this queue instead of the default node. E.g. 'calim08' to run 18MHz on calim08. :param targets: List of target-list file paths for photometry. :param catalog: Path to BDSF catalog for transient search masking. :param clean_snapshots: If True, produce CLEANed Stokes-I snapshots. :param clean_reduced_pixels: Scale clean snapshot pixels by frequency. :param reduced_pixels: If True, scale pixel count by subband frequency. :param skip_science: If True, skip science phases after PB correction. :param compress_snapshots: If True, fpack-compress snapshot FITS. :param dynamic_run_label: If set, enables dynamic dispatch mode. :returns: Celery AsyncResult for the chord (Phase 2 result). .. py:function:: submit_subband_pipeline_chained(hour_specs: List[dict], subband: str, bp_table: str, xy_table: str, run_label: str, peel_sky: bool = False, peel_rfi: bool = False, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, queue_override: Optional[str] = None, targets: Optional[List[str]] = None, catalog: Optional[str] = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False) -> celery.result.AsyncResult Submit multiple LST-hours for one subband as a sequential chain. Instead of submitting all hours simultaneously (which floods the worker with Phase 1 tasks and starves Phase 2), this function submits only the **first hour** immediately. Phase 2 of each hour triggers the next hour's chord upon completion, ensuring: - Phase 2 (imaging) runs on an idle node with full CPU/memory - NVMe space is freed before the next hour's data arrives - No resource contention between hours on the same node Different subbands still run in **parallel** on different nodes. :param hour_specs: List of dicts, each with keys: - ``ms_files``: List of source MS paths for that hour. - ``lst_label``: e.g. '14h'. - ``obs_date``: e.g. '2025-06-15'. :param subband: Frequency label, e.g. '73MHz'. :param bp_table: Bandpass calibration table. :param xy_table: XY-phase calibration table. :param run_label: Human-readable run identifier. :param peel_sky: Peel astronomical sky sources. :param peel_rfi: Peel RFI sources. :param hot_baselines: Run hot-baseline diagnostics. :param skip_cleanup: Keep intermediate files on NVMe. :param cleanup_nvme: Remove entire NVMe work_dir after archiving. :param queue_override: Force routing to this queue. :param targets: Target-list file paths for photometry. :param catalog: BDSF catalog for transient search masking. :param clean_snapshots: Produce CLEANed Stokes-I snapshots. :param clean_reduced_pixels: Scale clean snapshot pixels by frequency. :param reduced_pixels: Scale pixel count by subband frequency. :param skip_science: Skip science phases after PB correction. :param compress_snapshots: fpack-compress snapshot FITS. :returns: Celery AsyncResult for the first hour's chord (only the first hour is submitted immediately; subsequent hours are triggered by Phase 2 callbacks).