Subband Processing Pipeline — Architecture & File Reference
This document covers the Celery-based subband processing pipeline for OVRO-LWA. It processes per-subband data through calibration, flagging, peeling, imaging, science extraction (dewarping, photometry, transient search, flux check), and archival — all orchestrated by Celery with NVMe-local execution on the calim cluster.
Overview
The pipeline uses the following Celery chord pattern:
Phase 1 (parallel, one Celery task per MS file):
copy to NVMe → flag bad antennas → apply calibration → peel sky → peel RFI
Phase 2 (sequential, runs once all Phase 1 tasks finish):
concatenate → fix field IDs → change phase centre → AOFlagger →
pilot snapshots → snapshot QA → optional clean I snapshots →
hot-baseline removal → science imaging →
PB correction → ionospheric dewarping → target photometry →
solar system photometry → transient search → flux scale check →
archive to Lustre (per-run + centralized samples/detections)
Phase 3 (post-run, after all subbands complete):
inverse-variance wideband stacking (Red/Green/Blue bands) →
3-colour PNG composites → wideband transient search →
wideband solar system photometry → detection gathering →
email summary report
Both phases are routed to the same per-node queue (e.g. calim08) so that
all I/O stays on the node’s local NVMe.
Every step emits [TIMER] log lines (e.g. [TIMER] aoflagger: 42.3s)
for post-run performance analysis. Grep with grep '\[TIMER\]' worker.log.
File Map
Submission & Orchestration
File |
Purpose |
|---|---|
|
CLI entry point. Discovers MS files, computes LST segments, submits one chord per (subband, LST-hour) to the correct Celery queue. Key flags: |
|
Celery task definitions. Contains |
|
Celery app configuration. Defines broker/backend, all queues ( |
Processing Logic (no Celery decorators — pure functions, testable locally)
File |
Purpose |
|---|---|
|
Core subband steps. File discovery, NVMe copy, SPW map calculation, calibration application, bad-antenna flagging (via MNC subprocess), concatenation, field ID fix, snapshot QA, scan-based integration flagging, timestamp injection, PB correction dispatch, |
|
Hot-baseline & amplitude-vs-UV diagnostics. Identifies bad antennas/baselines from a concatenated MS, produces heatmap and UV diagnostic plots, optionally flags in-place. |
|
Primary beam correction. Applies the OVRO-LWA beam model to FITS images using |
|
OVRO-LWA beam model. |
Science Extraction Modules (Phase 2, steps 7b A–D)
File |
Purpose |
|---|---|
|
Ionospheric dewarping. Parses VLSSr catalog, cross-matches extracted sources, builds 2-D pixel warp screens via |
|
Target photometry. Loads CSV target lists, extracts Stokes I+V cutouts (deep + 10-min difference), measures flux with ionospheric-aware search radius + confusing-source masking, optionally fits with CASA |
|
Solar system photometry. Computes ephemerides for Moon, Mercury–Neptune via |
|
Transient search. Stokes V blind search + Stokes I deep-subtracted search. Adaptive bright-source masking, local RMS maps, bi-lobed artifact rejection, catalog cross-match, cutout generation. Quality gate: >10 candidates triggers a warning flag. |
|
Flux scale validation. Fits calibrators (3C48, 3C147, 3C196, 3C286, 3C295, 3C380, 3C123) with CASA |
Phase 3 — Wideband Aggregation (post_process_science)
File |
Purpose |
|---|---|
|
Wideband stacking & reporting. Inverse-variance weighted co-adds across subbands in 3 colour bands (Red 18–41 MHz, Green 41–64 MHz, Blue 64–85 MHz). Generates 3-colour PNGs, runs wideband transient search, gathers target/transient/solar system CSVs from all subbands, and sends HTML email report with attachments. |
Configuration
File |
Purpose |
|---|---|
|
All pipeline configuration in one place. Node↔subband mapping ( |
|
Hardware mapping. 353-entry |
|
Orca-wide config singleton. Reads |
|
Default config template. Copy to |
|
Nearby stars target list. Columns: |
|
Exoplanet host stars target list. Hot/warm Jupiter hosts. Used by |
|
Local volume targets catalog. Used by |
Utilities & Wrappers
File |
Purpose |
|---|---|
|
Standalone script for querying bad antennas from MNC antenna health. Runs as a subprocess in the |
|
Wrapper around TTCal.jl for peeling (both |
|
Wrapper around |
Node ↔ Subband ↔ Queue Mapping
Each subband is pinned to a specific calim node. The node’s local NVMe
(/fast/pipeline/) is used for all intermediate I/O. Final products are
archived to shared Lustre (/lustre/pipeline/).
Subband(s) |
Node |
Celery Queue |
NVMe |
|---|---|---|---|
18 MHz, 23 MHz |
lwacalim00 |
|
Shared (half resources each) |
27 MHz, 32 MHz |
lwacalim01 |
|
Shared |
36 MHz, 41 MHz |
lwacalim02 |
|
Shared |
46 MHz, 50 MHz |
lwacalim03 |
|
Shared |
55 MHz |
lwacalim04 |
|
Full node |
59 MHz |
lwacalim05 |
|
Full node |
64 MHz |
lwacalim06 |
|
Full node |
69 MHz |
lwacalim07 |
|
Full node |
73 MHz |
lwacalim08 |
|
Full node |
78 MHz |
lwacalim09 |
|
Full node |
82 MHz |
lwacalim00 |
|
Shared with 18/23 MHz |
Note: calim02 and calim10 are currently inactive. Subbands that map to inactive nodes must use
--remap(static mode) or--dynamicmode.
All nodes get 44 CPUs / 120 GB / 44 wsclean threads (-j 44).
The old dual-subband halving (22 threads) was removed — in dynamic mode
any subband can land on any node, so full resources are always allocated.
Dynamic Scheduling Mode
By default, each subband is pinned to a fixed node (static mode). The
--dynamic flag enables a work-queue scheduler where any free node
picks up the next (subband, hour) work unit automatically.
How it works
1. Submission script builds all (subband × hour) work units
2. All work units are pushed to a Redis list (FIFO queue)
3. One work unit is popped per node to seed initial execution
4. When a node finishes Phase 2, it pops the next work unit from Redis
and submits it to itself
5. Continues until the Redis queue is empty → nodes go idle
This eliminates idle nodes when subbands have uneven processing times. Any subband can run on any node — data is copied from Lustre to local NVMe at the start of Phase 1 regardless.
Usage
# All subbands, all hours, dynamically distributed across active nodes
python pipeline/subband_celery.py \
--range 00-24 --date 2024-12-21 \
--bp_table /path/to/bandpass.B.flagged \
--xy_table /path/to/xyphase.Xf \
--peel_sky --peel_rfi --hot_baselines \
--cleanup_nvme --compress_snapshots \
--dynamic
# Custom node pool
--dynamic --nodes calim01 calim03 calim08 calim09
# Exclude specific nodes
--dynamic --exclude_nodes calim07
# Shared dynamic queue across multiple submissions (different dates/ranges)
--dynamic --dynamic_queue_label Dec21_Backlog
# Append-only: push to existing shared queue without seeding new node tasks
# (use only while nodes are already actively running from that queue)
--dynamic --dynamic_queue_label Dec21_Backlog --dynamic_append_only
# Dry run — shows work units and node pool without submitting
--dynamic --dry_run
Monitoring the Redis work queue
# From any node with access to Redis (10.41.0.85)
python -c "
import redis, json
r = redis.Redis(host='10.41.0.85', port=6379, db=0)
key = 'pipeline:dynamic:Run_YYYYMMDD_HHMMSS' # replace with actual run label
n = r.llen(key)
print(f'Remaining: {n}')
for i, raw in enumerate(r.lrange(key, 0, -1)):
wu = json.loads(raw)
print(f' [{i}] {wu[\"subband\"]} {wu[\"lst_label\"]} ({len(wu[\"ms_files\"])} files)')
"
Or use redis-cli if available:
redis-cli -h 10.41.0.85 LLEN pipeline:dynamic:Run_YYYYMMDD_HHMMSS
redis-cli -h 10.41.0.85 LRANGE pipeline:dynamic:Run_YYYYMMDD_HHMMSS 0 -1
Static vs Dynamic comparison
Aspect |
Static ( |
Dynamic ( |
|---|---|---|
Node assignment |
Fixed per subband |
Any free node |
Idle nodes |
Possible (uneven load) |
Minimized |
NVMe locality |
Guaranteed same node |
Guaranteed (copy at Phase 1 start) |
Subband ordering |
Priority order |
FIFO from Redis queue |
Remapping inactive nodes |
Manual |
Automatic (excluded from pool) |
Sequential chaining |
Per-subband on same node |
Work units are independent |
Directory Layout
NVMe (per-node, not shared):
/fast/pipeline/<lst>/<date>/<run_label>/<subband>/
├── provenance.json # Run metadata (git version, cal tables, flags)
├── *.ms # Individual MS files (Phase 1)
├── <subband>_concat.ms # Concatenated MS (Phase 2)
├── I/deep/ # Stokes I deep images (+pbcorr, +dewarped)
├── I/10min/ # Stokes I 10-min interval images
├── V/deep/ # Stokes V deep images
├── V/10min/ # Stokes V 10-min interval images
├── snapshots/ # Pilot snapshot images + QA
├── snapshots_clean/ # CLEANed Stokes-I snapshots (optional)
├── QA/ # Hot-baseline plots, flux check CSV+plot
├── samples/<sample>/<target>/ # Target photometry cutouts + CSVs
├── detections/ # Transient cutouts, solar system detections
├── Dewarp_Diagnostics/ # Warp screens, quiver plots
└── logs/ # CASA log, subprocess logs
Lustre (shared, archived products):
/lustre/pipeline/images/<lst>/<date>/<run_label>/<subband>/
└── (same structure as above, minus intermediate MS files)
Lustre (Phase 3 wideband products):
/lustre/pipeline/images/<lst>/<date>/<run_label>/Wideband/
├── Wideband_Red_I_deep_*.fits # Red-band co-adds (18–41 MHz)
├── Wideband_Green_I_deep_*.fits # Green-band co-adds (41–64 MHz)
├── Wideband_Blue_I_deep_*.fits # Blue-band co-adds (64–85 MHz)
├── Wideband_*_3color.png # 3-colour PNG composites
└── thermal_noise.csv # Per-subband Stokes V RMS
Lustre (centralized cross-run aggregation):
/lustre/pipeline/images/samples/<sample>/<target>/<subband>/
/lustre/pipeline/images/detections/transients/{I,V}/<J-name>/<subband>/
/lustre/pipeline/images/detections/SolarSystem/<Body>/<subband>/
Imaging Steps
The pipeline produces 7 image products per subband-hour:
# |
Stokes |
Category |
Suffix |
Key Features |
|---|---|---|---|---|
1 |
I |
deep |
|
Multiscale, inner taper, robust −0.75 |
2 |
I |
deep |
|
Multiscale, inner taper, robust 0 |
3 |
I |
deep |
|
Multiscale, no taper, robust −0.75 |
4 |
I |
deep |
|
Multiscale, no taper, robust 0 |
5 |
I |
10min |
|
Multiscale, taper, 6 intervals |
6 |
V |
deep |
|
Dirty image, taper |
7 |
V |
10min |
|
Dirty image, taper, 6 intervals |
All images are 4096×4096 at 0.03125° scale with primary beam correction applied.
With --clean_reduced_pixels, clean snapshots use frequency-dependent resolution
(FoV is preserved at ~128° by scaling both pixel count and pixel scale):
Tier |
Subbands |
Size |
Scale (deg/px) |
|---|---|---|---|
Lower |
18–36 MHz |
1024×1024 |
0.125 |
Middle |
41–59 MHz |
2048×2048 |
0.0625 |
Upper |
64–82 MHz |
4096×4096 |
0.03125 |
External Dependencies
These must be available on the calim worker nodes:
Tool |
Purpose |
Default Path |
|---|---|---|
WSClean |
Imaging |
|
AOFlagger |
RFI flagging |
|
chgcentre |
Phase centre rotation |
|
TTCal.jl |
Source peeling |
Via conda envs |
mnc_python |
Antenna health queries |
In |
extractor_pb_75 |
Primary beam model |
Embedded in |
RabbitMQ |
Celery broker |
|
Redis |
Celery result backend |
|
How It Connects
pipeline/subband_celery.py # User runs this
│ Flags: --targets, --catalog, --clean_snapshots, --skip_science, --remap SUBBAND=NODE
│
├── orca.resources.subband_config # Reads NODE_SUBBAND_MAP, queue routing
├── orca.transform.subband_processing.find_archive_files_for_subband()
│
└── orca.tasks.subband_tasks.submit_subband_pipeline()
│
├── chord([prepare_one_ms_task.s(...) × N]) ← Phase 1 (parallel)
│ │
│ ├── subband_processing.copy_ms_to_nvme()
│ ├── subband_processing.flag_bad_antennas()
│ │ └── subprocess → orca/utils/mnc_antennas.py
│ ├── subband_processing.apply_calibration()
│ └── orca.wrapper.ttcal.zest_with_ttcal() (sky + RFI)
│
└── process_subband_task.s(...) ← Phase 2 (callback)
│
├── subband_processing.concatenate_ms()
├── subband_processing.fix_field_id()
├── orca.wrapper.change_phase_centre.change_phase_center()
├── AOFlagger (subprocess)
├── WSClean pilot snapshots (dirty)
├── subband_processing.analyze_snapshot_quality()
├── subband_processing.flag_bad_integrations()
├── hot_baselines.run_diagnostics() (optional)
├── WSClean clean I snapshots (optional, --clean_snapshots)
├── WSClean science imaging ×7 (subprocess)
├── pb_correction.apply_pb_correction()
├── write provenance.json
│
│ --- Science Phases (all on NVMe) ---
├── A. ionospheric_dewarping (VLSSr cross-match)
├── B. cutout.process_target() (--targets)
├── B2. solar_system_cutout.process_solar_system()
├── C. transient_search.run_test() (--catalog)
├── D. flux_check_cutout.run_flux_check()
│
└── subband_processing.archive_results()
├── per-run archive to Lustre
├── centralized samples/ tree
└── centralized detections/ tree
Phase 3 (after all subbands archived):
post_process_science.run_post_processing(run_dir)
│
├── flux_check_cutout.run_flux_check() (wideband)
├── run_wideband_stacking()
│ ├── get_inner_rms() on Stokes V images
│ ├── _stack_images() per colour band (Red/Green/Blue)
│ ├── _make_3color_png() composites
│ └── transient_search.run_test() on wideband images
├── solar_system_cutout.process_wideband_solar_system()
├── gather_detections() (CSV collation)
└── send_email_report() (SMTP + attachments)
Timing Instrumentation
Every step logs [TIMER] <tag>: <seconds>s. Extract with:
grep '\[TIMER\]' /path/to/worker.log
CLI Examples
Run 55 MHz on calim08 (dirty snapshots):
python pipeline/subband_celery.py \
--range 14-15 --date 2024-12-18 \
--bp_table /lustre/gh/bandpass_tables/2024-12-18/calibration_2024-12-18_01h.B.flagged \
--xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \
--subbands 55MHz \
--remap 55MHz=calim08 \
--peel_sky --peel_rfi --hot_baselines
Same, with additional CLEANed Stokes-I snapshots (snapshots_clean/):
python pipeline/subband_celery.py \
--range 14-15 --date 2024-12-18 \
--bp_table /lustre/gh/bandpass_tables/2024-12-18/calibration_2024-12-18_01h.B.flagged \
--xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \
--subbands 55MHz \
--remap 55MHz=calim08 \
--peel_sky --peel_rfi --hot_baselines \
--clean_snapshots
All subbands with frequency-tiered clean snapshot resolution:
python pipeline/subband_celery.py \
--range 01-02 --date 2024-12-26 \
--bp_table /lustre/pipeline/calibration/results/2024-12-26/02h/successful/20251225_055451/tables/calibration_2024-12-26_02h.B.flagged \
--xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \
--subbands 18MHz 23MHz 27MHz 32MHz 36MHz 41MHz 46MHz 50MHz 55MHz 59MHz 64MHz 69MHz 73MHz 78MHz 82MHz \
--peel_sky --peel_rfi --hot_baselines \
--skip_science --cleanup_nvme --compress_snapshots \
--clean_snapshots --clean_reduced_pixels \
--dynamic --exclude_nodes calim02 calim08 calim10 \
--dynamic_queue_label Dec26_all
Full observation with science extraction:
python pipeline/subband_celery.py \
--range 13-19 --date 2025-06-15 \
--bp_table /lustre/gh/calibration/pipeline/bandpass/latest.bandpass \
--xy_table /lustre/gh/calibration/pipeline/xy/latest.X \
--peel_sky --peel_rfi --hot_baselines \
--targets /lustre/gh/targets/exoplanets.csv /lustre/gh/targets/pulsars.csv \
--catalog /lustre/gh/catalogs/bdsf_73MHz.csv \
--clean_snapshots
Add --dry_run to any command to preview without submitting.
Dynamic mode — 3-hour test on 2 nodes:
python pipeline/subband_celery.py \
--range 03-06 --date 2024-12-21 \
--bp_table /lustre/pipeline/calibration/results/2024-12-21/02h/successful/20251224_131951/tables/calibration_2024-12-21_02h.B.flagged \
--xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \
--subbands 73MHz \
--peel_sky --peel_rfi --hot_baselines \
--skip_science --cleanup_nvme --compress_snapshots \
--dynamic --nodes calim01 calim03
Dynamic mode — full observation across all active nodes:
python pipeline/subband_celery.py \
--range 00-24 --date 2024-12-21 \
--bp_table /lustre/pipeline/calibration/results/2024-12-21/02h/successful/20251224_131951/tables/calibration_2024-12-21_02h.B.flagged \
--xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \
--peel_sky --peel_rfi --hot_baselines \
--cleanup_nvme --compress_snapshots \
--dynamic