# Subband Processing Pipeline — Architecture & File Reference > This document covers the Celery-based subband processing pipeline for OVRO-LWA. > It processes per-subband data through calibration, flagging, peeling, imaging, > science extraction (dewarping, photometry, transient search, flux check), > and archival — all orchestrated by Celery with NVMe-local execution on the calim > cluster. --- ## Overview The pipeline uses the following **Celery chord pattern**: ``` Phase 1 (parallel, one Celery task per MS file): copy to NVMe → flag bad antennas → apply calibration → peel sky → peel RFI Phase 2 (sequential, runs once all Phase 1 tasks finish): concatenate → fix field IDs → change phase centre → AOFlagger → pilot snapshots → snapshot QA → optional clean I snapshots → hot-baseline removal → science imaging → PB correction → ionospheric dewarping → target photometry → solar system photometry → transient search → flux scale check → archive to Lustre (per-run + centralized samples/detections) Phase 3 (post-run, after all subbands complete): inverse-variance wideband stacking (Red/Green/Blue bands) → 3-colour PNG composites → wideband transient search → wideband solar system photometry → detection gathering → email summary report ``` Both phases are routed to the **same per-node queue** (e.g. `calim08`) so that all I/O stays on the node's local NVMe. Every step emits `[TIMER]` log lines (e.g. `[TIMER] aoflagger: 42.3s`) for post-run performance analysis. Grep with `grep '\[TIMER\]' worker.log`. --- ## File Map ### Submission & Orchestration | File | Purpose | |------|---------| | `pipeline/subband_celery.py` | **CLI entry point.** Discovers MS files, computes LST segments, submits one chord per (subband, LST-hour) to the correct Celery queue. Key flags: `--targets`, `--catalog`, `--clean_snapshots`, `--clean_reduced_pixels`, `--reduced_pixels`, `--skip_science`, `--remap SUBBAND=NODE`, `--dynamic`, `--nodes`, `--exclude_nodes`, `--dynamic_queue_label`, `--dynamic_append_only`, `--compress_snapshots`. | | `orca/tasks/subband_tasks.py` | **Celery task definitions.** Contains `prepare_one_ms_task` (Phase 1), `process_subband_task` (Phase 2 including science phases A–D), and `submit_subband_pipeline()` which wires them into a chord. Writes `provenance.json` per work unit and emits `[TIMER]` instrumentation. | | `orca/celery.py` | **Celery app configuration.** Defines broker/backend, all queues (`default`, `cosmology`, `bandpass`, `imaging`, `calim00`–`calim10`), and task include list. | ### Processing Logic (no Celery decorators — pure functions, testable locally) | File | Purpose | |------|---------| | `orca/transform/subband_processing.py` | **Core subband steps.** File discovery, NVMe copy, SPW map calculation, calibration application, bad-antenna flagging (via MNC subprocess), concatenation, field ID fix, snapshot QA, scan-based integration flagging, timestamp injection, PB correction dispatch, `find_deep_image()`, `extract_sources_to_df()` (BDSF), archival (including centralized `samples/` and `detections/` trees), subprocess helpers. | | `orca/transform/hot_baselines.py` | **Hot-baseline & amplitude-vs-UV diagnostics.** Identifies bad antennas/baselines from a concatenated MS, produces heatmap and UV diagnostic plots, optionally flags in-place. | | `orca/transform/pb_correction.py` | **Primary beam correction.** Applies the OVRO-LWA beam model to FITS images using `extractor_pb_75`. | | `orca/transform/extractor_pb_75.py` | **OVRO-LWA beam model.** `BeamModel` class (loads H5 beam response, interpolates to image grid), `generate_warp_screens()`, `apply_warp()`, VLSSr/NVSS catalog helpers. Ported from ExoPipe — standalone, no external imports. | ### Science Extraction Modules (Phase 2, steps 7b A–D) | File | Purpose | |------|---------| | `orca/transform/ionospheric_dewarping.py` | **Ionospheric dewarping.** Parses VLSSr catalog, cross-matches extracted sources, builds 2-D pixel warp screens via `griddata`, applies `map_coordinates` correction to all FITS images. Produces diagnostic quiver/distortion plots in `Dewarp_Diagnostics/`. | | `orca/transform/cutout.py` | **Target photometry.** Loads CSV target lists, extracts Stokes I+V cutouts (deep + 10-min difference), measures flux with ionospheric-aware search radius + confusing-source masking, optionally fits with CASA `imfit`. Outputs per-target CSVs, diagnostic PNGs, and detection flags. | | `orca/transform/solar_system_cutout.py` | **Solar system photometry.** Computes ephemerides for Moon, Mercury–Neptune via `astropy.coordinates.get_body`, extracts I+V cutouts from deep and 10-min images, writes per-body CSVs with angular diameters and distances. Also contains `process_wideband_solar_system()` for Phase 3 wideband stacked images. | | `orca/transform/transient_search.py` | **Transient search.** Stokes V blind search + Stokes I deep-subtracted search. Adaptive bright-source masking, local RMS maps, bi-lobed artifact rejection, catalog cross-match, cutout generation. Quality gate: >10 candidates triggers a warning flag. | | `orca/transform/flux_check_cutout.py` | **Flux scale validation.** Fits calibrators (3C48, 3C147, 3C196, 3C286, 3C295, 3C380, 3C123) with CASA `imfit`, compares to Scaife & Heald 2012 / Perley-Butler 2017 models. Outputs `flux_check_hybrid.csv` and diagnostic ratio plot in `QA/`. | ### Phase 3 — Wideband Aggregation (post_process_science) | File | Purpose | |------|---------| | `orca/transform/post_process_science.py` | **Wideband stacking & reporting.** Inverse-variance weighted co-adds across subbands in 3 colour bands (Red 18–41 MHz, Green 41–64 MHz, Blue 64–85 MHz). Generates 3-colour PNGs, runs wideband transient search, gathers target/transient/solar system CSVs from all subbands, and sends HTML email report with attachments. | ### Configuration | File | Purpose | |------|---------| | `orca/resources/subband_config.py` | **All pipeline configuration in one place.** Node↔subband mapping (`NODE_SUBBAND_MAP`), NVMe/Lustre directory layout, peeling parameters, AOFlagger strategy path, hot-baseline params (with `uv_window_size`), `SNAPSHOT_PARAMS` (dirty pilot snapshots) and `SNAPSHOT_CLEAN_I_PARAMS` (Stokes-I clean snapshots in `snapshots_clean/`), per-subband pixel scaling (`get_pixel_size()`, `get_pixel_scale()`, `_SUBBAND_PIXEL_SCALE`) for `--clean_reduced_pixels`, all 7 imaging steps, resource allocation per node, `CALIB_DATA` (SH12/PB17 flux models for 7 calibrators), `VLSSR_CATALOG` and `BEAM_MODEL_H5` paths. | | `orca/resources/system_config.py` | **Hardware mapping.** 353-entry `SYSTEM_CONFIG` dict mapping LWA antenna numbers to correlator numbers, ARX boards, SNAP2 boards, and channel assignments. Used by `hot_baselines.py`. | | `orca/configmanager.py` | **Orca-wide config singleton.** Reads `~/orca-conf.yml` (or `default-orca-conf.yml`) for broker URI, backend URI, telescope params, executable paths. | | `orca/default-orca-conf.yml` | Default config template. Copy to `~/orca-conf.yml` and fill in credentials. | | `orca/resources/10pc_sample.csv` | **Nearby stars target list.** Columns: `common_name`, `ra_current`, `dec_current`, `coords_sexagesimal`, `distance`. Used by `--targets`. | | `orca/resources/OVRO_LWA_Hot_Warm_Jupiters_2026.csv` | **Exoplanet host stars target list.** Hot/warm Jupiter hosts. Used by `--targets`. | | `orca/resources/OVRO_LWA_Local_Volume_Targets.csv` | **Local volume targets catalog.** Used by `--catalog` for transient search masking. | ### Utilities & Wrappers | File | Purpose | |------|---------| | `orca/utils/mnc_antennas.py` | **Standalone script** for querying bad antennas from MNC antenna health. Runs as a subprocess in the `development` conda env (because `mnc_python` is not in the pipeline env). Outputs JSON to stdout. | | `orca/wrapper/ttcal.py` | Wrapper around TTCal.jl for peeling (both `peel` and `zest` modes). | | `orca/wrapper/change_phase_centre.py` | Wrapper around `chgcentre` for re-phasing visibilities. | --- ## Node ↔ Subband ↔ Queue Mapping Each subband is pinned to a specific calim node. The node's local NVMe (`/fast/pipeline/`) is used for all intermediate I/O. Final products are archived to shared Lustre (`/lustre/pipeline/`). | Subband(s) | Node | Celery Queue | NVMe | |------------|------|-------------|------| | 18 MHz, 23 MHz | lwacalim00 | `calim00` | Shared (half resources each) | | 27 MHz, 32 MHz | lwacalim01 | `calim01` | Shared | | 36 MHz, 41 MHz | lwacalim02 | `calim02` | Shared | | 46 MHz, 50 MHz | lwacalim03 | `calim03` | Shared | | 55 MHz | lwacalim04 | `calim04` | Full node | | 59 MHz | lwacalim05 | `calim05` | Full node | | 64 MHz | lwacalim06 | `calim06` | Full node | | 69 MHz | lwacalim07 | `calim07` | Full node | | **73 MHz** | lwacalim08 | `calim08` | Full node | | 78 MHz | lwacalim09 | `calim09` | Full node | | 82 MHz | lwacalim00 | `calim00` | Shared with 18/23 MHz | > **Note:** calim02 and calim10 are currently inactive. Subbands that map to > inactive nodes must use `--remap` (static mode) or `--dynamic` mode. All nodes get 44 CPUs / 120 GB / 44 wsclean threads (`-j 44`). The old dual-subband halving (22 threads) was removed — in dynamic mode any subband can land on any node, so full resources are always allocated. --- ## Dynamic Scheduling Mode By default, each subband is pinned to a fixed node (static mode). The **`--dynamic`** flag enables a work-queue scheduler where any free node picks up the next (subband, hour) work unit automatically. ### How it works ``` 1. Submission script builds all (subband × hour) work units 2. All work units are pushed to a Redis list (FIFO queue) 3. One work unit is popped per node to seed initial execution 4. When a node finishes Phase 2, it pops the next work unit from Redis and submits it to itself 5. Continues until the Redis queue is empty → nodes go idle ``` This eliminates idle nodes when subbands have uneven processing times. Any subband can run on any node — data is copied from Lustre to local NVMe at the start of Phase 1 regardless. ### Usage ```bash # All subbands, all hours, dynamically distributed across active nodes python pipeline/subband_celery.py \ --range 00-24 --date 2024-12-21 \ --bp_table /path/to/bandpass.B.flagged \ --xy_table /path/to/xyphase.Xf \ --peel_sky --peel_rfi --hot_baselines \ --cleanup_nvme --compress_snapshots \ --dynamic # Custom node pool --dynamic --nodes calim01 calim03 calim08 calim09 # Exclude specific nodes --dynamic --exclude_nodes calim07 # Shared dynamic queue across multiple submissions (different dates/ranges) --dynamic --dynamic_queue_label Dec21_Backlog # Append-only: push to existing shared queue without seeding new node tasks # (use only while nodes are already actively running from that queue) --dynamic --dynamic_queue_label Dec21_Backlog --dynamic_append_only # Dry run — shows work units and node pool without submitting --dynamic --dry_run ``` ### Monitoring the Redis work queue ```python # From any node with access to Redis (10.41.0.85) python -c " import redis, json r = redis.Redis(host='10.41.0.85', port=6379, db=0) key = 'pipeline:dynamic:Run_YYYYMMDD_HHMMSS' # replace with actual run label n = r.llen(key) print(f'Remaining: {n}') for i, raw in enumerate(r.lrange(key, 0, -1)): wu = json.loads(raw) print(f' [{i}] {wu[\"subband\"]} {wu[\"lst_label\"]} ({len(wu[\"ms_files\"])} files)') " ``` Or use `redis-cli` if available: ```bash redis-cli -h 10.41.0.85 LLEN pipeline:dynamic:Run_YYYYMMDD_HHMMSS redis-cli -h 10.41.0.85 LRANGE pipeline:dynamic:Run_YYYYMMDD_HHMMSS 0 -1 ``` ### Static vs Dynamic comparison | Aspect | Static (`--remap`) | Dynamic (`--dynamic`) | |--------|-------------------|----------------------| | Node assignment | Fixed per subband | Any free node | | Idle nodes | Possible (uneven load) | Minimized | | NVMe locality | Guaranteed same node | Guaranteed (copy at Phase 1 start) | | Subband ordering | Priority order | FIFO from Redis queue | | Remapping inactive nodes | Manual `--remap` | Automatic (excluded from pool) | | Sequential chaining | Per-subband on same node | Work units are independent | --- ## Directory Layout ``` NVMe (per-node, not shared): /fast/pipeline///// ├── provenance.json # Run metadata (git version, cal tables, flags) ├── *.ms # Individual MS files (Phase 1) ├── _concat.ms # Concatenated MS (Phase 2) ├── I/deep/ # Stokes I deep images (+pbcorr, +dewarped) ├── I/10min/ # Stokes I 10-min interval images ├── V/deep/ # Stokes V deep images ├── V/10min/ # Stokes V 10-min interval images ├── snapshots/ # Pilot snapshot images + QA ├── snapshots_clean/ # CLEANed Stokes-I snapshots (optional) ├── QA/ # Hot-baseline plots, flux check CSV+plot ├── samples/// # Target photometry cutouts + CSVs ├── detections/ # Transient cutouts, solar system detections ├── Dewarp_Diagnostics/ # Warp screens, quiver plots └── logs/ # CASA log, subprocess logs Lustre (shared, archived products): /lustre/pipeline/images///// └── (same structure as above, minus intermediate MS files) Lustre (Phase 3 wideband products): /lustre/pipeline/images////Wideband/ ├── Wideband_Red_I_deep_*.fits # Red-band co-adds (18–41 MHz) ├── Wideband_Green_I_deep_*.fits # Green-band co-adds (41–64 MHz) ├── Wideband_Blue_I_deep_*.fits # Blue-band co-adds (64–85 MHz) ├── Wideband_*_3color.png # 3-colour PNG composites └── thermal_noise.csv # Per-subband Stokes V RMS Lustre (centralized cross-run aggregation): /lustre/pipeline/images/samples//// /lustre/pipeline/images/detections/transients/{I,V}/// /lustre/pipeline/images/detections/SolarSystem/// ``` --- ## Imaging Steps The pipeline produces 7 image products per subband-hour: | # | Stokes | Category | Suffix | Key Features | |---|--------|----------|--------|-------------| | 1 | I | deep | `I-Deep-Taper-Robust-0.75` | Multiscale, inner taper, robust −0.75 | | 2 | I | deep | `I-Deep-Taper-Robust-0` | Multiscale, inner taper, robust 0 | | 3 | I | deep | `I-Deep-NoTaper-Robust-0.75` | Multiscale, no taper, robust −0.75 | | 4 | I | deep | `I-Deep-NoTaper-Robust-0` | Multiscale, no taper, robust 0 | | 5 | I | 10min | `I-Taper-10min` | Multiscale, taper, 6 intervals | | 6 | V | deep | `V-Taper-Deep` | Dirty image, taper | | 7 | V | 10min | `V-Taper-10min` | Dirty image, taper, 6 intervals | All images are 4096×4096 at 0.03125° scale with primary beam correction applied. With `--clean_reduced_pixels`, clean snapshots use frequency-dependent resolution (FoV is preserved at ~128° by scaling both pixel count and pixel scale): | Tier | Subbands | Size | Scale (deg/px) | |------|----------|------|----------------| | Lower | 18–36 MHz | 1024×1024 | 0.125 | | Middle | 41–59 MHz | 2048×2048 | 0.0625 | | Upper | 64–82 MHz | 4096×4096 | 0.03125 | --- ## External Dependencies These must be available on the calim worker nodes: | Tool | Purpose | Default Path | |------|---------|-------------| | **WSClean** | Imaging | `/opt/bin/wsclean` (or `$WSCLEAN_BIN`) | | **AOFlagger** | RFI flagging | `/opt/bin/aoflagger` | | **chgcentre** | Phase centre rotation | `/opt/bin/chgcentre` | | **TTCal.jl** | Source peeling | Via conda envs `julia060` / `ttcal_dev` | | **mnc_python** | Antenna health queries | In `development` conda env | | **extractor_pb_75** | Primary beam model | Embedded in `orca/transform/extractor_pb_75.py` (beam H5 at `/lustre/gh/calibration/pipeline/reference/beams/OVRO-LWA_MROsoil_updatedheight.h5`) | | **RabbitMQ** | Celery broker | `rabbitmq.calim.mcs.pvt:5672` | | **Redis** | Celery result backend | `10.41.0.85:6379` | --- ## How It Connects ``` pipeline/subband_celery.py # User runs this │ Flags: --targets, --catalog, --clean_snapshots, --skip_science, --remap SUBBAND=NODE │ ├── orca.resources.subband_config # Reads NODE_SUBBAND_MAP, queue routing ├── orca.transform.subband_processing.find_archive_files_for_subband() │ └── orca.tasks.subband_tasks.submit_subband_pipeline() │ ├── chord([prepare_one_ms_task.s(...) × N]) ← Phase 1 (parallel) │ │ │ ├── subband_processing.copy_ms_to_nvme() │ ├── subband_processing.flag_bad_antennas() │ │ └── subprocess → orca/utils/mnc_antennas.py │ ├── subband_processing.apply_calibration() │ └── orca.wrapper.ttcal.zest_with_ttcal() (sky + RFI) │ └── process_subband_task.s(...) ← Phase 2 (callback) │ ├── subband_processing.concatenate_ms() ├── subband_processing.fix_field_id() ├── orca.wrapper.change_phase_centre.change_phase_center() ├── AOFlagger (subprocess) ├── WSClean pilot snapshots (dirty) ├── subband_processing.analyze_snapshot_quality() ├── subband_processing.flag_bad_integrations() ├── hot_baselines.run_diagnostics() (optional) ├── WSClean clean I snapshots (optional, --clean_snapshots) ├── WSClean science imaging ×7 (subprocess) ├── pb_correction.apply_pb_correction() ├── write provenance.json │ │ --- Science Phases (all on NVMe) --- ├── A. ionospheric_dewarping (VLSSr cross-match) ├── B. cutout.process_target() (--targets) ├── B2. solar_system_cutout.process_solar_system() ├── C. transient_search.run_test() (--catalog) ├── D. flux_check_cutout.run_flux_check() │ └── subband_processing.archive_results() ├── per-run archive to Lustre ├── centralized samples/ tree └── centralized detections/ tree Phase 3 (after all subbands archived): post_process_science.run_post_processing(run_dir) │ ├── flux_check_cutout.run_flux_check() (wideband) ├── run_wideband_stacking() │ ├── get_inner_rms() on Stokes V images │ ├── _stack_images() per colour band (Red/Green/Blue) │ ├── _make_3color_png() composites │ └── transient_search.run_test() on wideband images ├── solar_system_cutout.process_wideband_solar_system() ├── gather_detections() (CSV collation) └── send_email_report() (SMTP + attachments) ``` --- ## Timing Instrumentation Every step logs `[TIMER] : s`. Extract with: ```bash grep '\[TIMER\]' /path/to/worker.log ``` ### Phase 1 tags (per MS) | Tag | Step | |-----|------| | `copy_to_nvme` | rsync MS to NVMe | | `flag_bad_antennas` | MNC antenna flagging | | `apply_calibration` | bandpass + XY table | | `peel_sky` | TTCal sky model | | `peel_rfi` | TTCal RFI model | | `phase1_total` | Total per-MS wall time | ### Phase 2 tags (per subband) | Tag | Step | |-----|------| | `concatenation` | virtualconcat / concat | | `fix_field_id` | FIELD table fix | | `chgcentre` | Phase centre rotation | | `aoflagger` | Post-concat flagging | | `pilot_snapshots_qa` | Snapshot imaging + QA | | `hot_baselines` | Hot baseline removal | | `clean_snapshots` | CLEANed Stokes-I snapshots (optional) | | `imaging_` | Each of 7 imaging steps | | `imaging_all` | All imaging combined | | `science_dewarping` | Ionospheric dewarping | | `science_target_photometry` | Target cutouts | | `science_solar_system` | Solar system photometry | | `science_transient_search` | Transient detection | | `science_flux_check` | Flux scale validation | | `archive_to_lustre` | Copy to Lustre | | `phase2_total` | Total Phase 2 wall time | ### Phase 3 tags (wideband, post-run) | Tag | Step | |-----|------| | `wideband_noise_analysis` | Stokes V inner-RMS measurement across subbands | | `wideband_stacking` | Inverse-variance co-add per colour band | | `wideband_3color_png` | 3-colour composite generation | | `wideband_transient_search` | Transient search on wideband images | | `wideband_solar_system` | Solar system photometry on wideband images | | `gather_detections` | Collate target/transient/solar system CSVs | | `send_email_report` | Email summary report | --- ## CLI Examples Run 55 MHz on calim08 (dirty snapshots): ```bash python pipeline/subband_celery.py \ --range 14-15 --date 2024-12-18 \ --bp_table /lustre/gh/bandpass_tables/2024-12-18/calibration_2024-12-18_01h.B.flagged \ --xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \ --subbands 55MHz \ --remap 55MHz=calim08 \ --peel_sky --peel_rfi --hot_baselines ``` Same, with additional CLEANed Stokes-I snapshots (`snapshots_clean/`): ```bash python pipeline/subband_celery.py \ --range 14-15 --date 2024-12-18 \ --bp_table /lustre/gh/bandpass_tables/2024-12-18/calibration_2024-12-18_01h.B.flagged \ --xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \ --subbands 55MHz \ --remap 55MHz=calim08 \ --peel_sky --peel_rfi --hot_baselines \ --clean_snapshots ``` All subbands with frequency-tiered clean snapshot resolution: ```bash python pipeline/subband_celery.py \ --range 01-02 --date 2024-12-26 \ --bp_table /lustre/pipeline/calibration/results/2024-12-26/02h/successful/20251225_055451/tables/calibration_2024-12-26_02h.B.flagged \ --xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \ --subbands 18MHz 23MHz 27MHz 32MHz 36MHz 41MHz 46MHz 50MHz 55MHz 59MHz 64MHz 69MHz 73MHz 78MHz 82MHz \ --peel_sky --peel_rfi --hot_baselines \ --skip_science --cleanup_nvme --compress_snapshots \ --clean_snapshots --clean_reduced_pixels \ --dynamic --exclude_nodes calim02 calim08 calim10 \ --dynamic_queue_label Dec26_all ``` Full observation with science extraction: ```bash python pipeline/subband_celery.py \ --range 13-19 --date 2025-06-15 \ --bp_table /lustre/gh/calibration/pipeline/bandpass/latest.bandpass \ --xy_table /lustre/gh/calibration/pipeline/xy/latest.X \ --peel_sky --peel_rfi --hot_baselines \ --targets /lustre/gh/targets/exoplanets.csv /lustre/gh/targets/pulsars.csv \ --catalog /lustre/gh/catalogs/bdsf_73MHz.csv \ --clean_snapshots ``` Add `--dry_run` to any command to preview without submitting. Dynamic mode — 3-hour test on 2 nodes: ```bash python pipeline/subband_celery.py \ --range 03-06 --date 2024-12-21 \ --bp_table /lustre/pipeline/calibration/results/2024-12-21/02h/successful/20251224_131951/tables/calibration_2024-12-21_02h.B.flagged \ --xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \ --subbands 73MHz \ --peel_sky --peel_rfi --hot_baselines \ --skip_science --cleanup_nvme --compress_snapshots \ --dynamic --nodes calim01 calim03 ``` Dynamic mode — full observation across all active nodes: ```bash python pipeline/subband_celery.py \ --range 00-24 --date 2024-12-21 \ --bp_table /lustre/pipeline/calibration/results/2024-12-21/02h/successful/20251224_131951/tables/calibration_2024-12-21_02h.B.flagged \ --xy_table /lustre/gh/polcal/xyphase_delay_pos_3.8643ns.Xf \ --peel_sky --peel_rfi --hot_baselines \ --cleanup_nvme --compress_snapshots \ --dynamic ```