"""Celery tasks for subband processing on NVMe-local calim servers.
Architecture
------------
The processing is split into two phases so that Celery handles the
per-MS parallelism in Phase 1, while Phase 2 runs sequentially on the
same node after all individual MS files are ready.
Phase 1 — per-MS (embarrassingly parallel, one Celery task per MS):
copy to NVMe → flag bad antennas → apply calibration → peel (sky + RFI)
Phase 2 — per-subband (sequential, runs after all Phase 1 tasks finish):
concatenate → fix field ID → chgcentre → AOFlagger → pilot snapshots →
snapshot QA → hot baseline removal → science imaging → archive to Lustre
Both phases are routed to the **same node-specific queue** (e.g. ``calim08``)
so that all I/O stays on the node's local NVMe.
Usage (from a submission script or notebook)::
from celery import chord
from orca.tasks.subband_tasks import prepare_one_ms_task, process_subband_task
from orca.resources.subband_config import get_queue_for_subband
queue = get_queue_for_subband('73MHz')
# Phase 1: parallel per-MS
header_tasks = [
prepare_one_ms_task.s(
src_ms=ms, nvme_work_dir=work_dir,
bp_table=bp, xy_table=xy,
peel_sky=True, peel_rfi=True,
) for ms in ms_files
]
# Phase 2: runs after all Phase 1 tasks complete; receives list of MS paths
pipeline = chord(header_tasks)(
process_subband_task.s(
work_dir=work_dir, subband='73MHz',
lst_label='14h', obs_date='2025-06-15',
run_label='Run_20250615',
)
)
"""
import os
import glob
import shutil
import json
import socket
import logging
import subprocess
import time
import traceback
# WSClean (linked against OpenBLAS) refuses to start if OpenBLAS multi-
# threading is enabled. Setting this early ensures every subprocess inherits it.
os.environ.setdefault('OPENBLAS_NUM_THREADS', '1')
from typing import List, Optional
import numpy as np
from celery import chord
from casacore.tables import table
from astropy.io import fits
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.animation as animation
except ImportError:
from orca.celery import app
from orca.wrapper.ttcal import zest_with_ttcal
from orca.wrapper.change_phase_centre import change_phase_center
from orca.transform.subband_processing import (
redirect_casa_log,
copy_ms_to_nvme,
apply_calibration,
flag_bad_antennas,
concatenate_ms,
fix_field_id,
analyze_snapshot_quality,
flag_bad_integrations,
add_timestamps_to_images,
plot_snapshot_diagnostics,
archive_results,
run_subprocess,
apply_pb_correction_to_images,
find_deep_image,
extract_sources_to_df,
)
from orca.configmanager import queue_config
from orca.resources.subband_config import (
PEELING_PARAMS,
AOFLAGGER_STRATEGY,
SNAPSHOT_PARAMS,
SNAPSHOT_CLEAN_I_PARAMS,
IMAGING_STEPS,
get_pixel_size,
get_pixel_scale,
NVME_BASE_DIR,
LUSTRE_ARCHIVE_DIR,
VLSSR_CATALOG,
get_queue_for_subband,
get_image_resources,
)
[docs]
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Dynamic dispatch — Redis-backed work queue
# ---------------------------------------------------------------------------
[docs]
DYNAMIC_WORK_QUEUE = "pipeline:dynamic:{run_label}"
def _dynamic_queue_length(run_label: str) -> int:
"""Return the number of queued work units for a dynamic run label."""
import redis as _redis
r = _redis.Redis.from_url(queue_config.result_backend_uri)
key = DYNAMIC_WORK_QUEUE.format(run_label=run_label)
return int(r.llen(key))
def _push_work_units(run_label: str, work_units: List[dict]) -> int:
"""Push work units to a Redis list for dynamic dispatch.
Each work unit is a complete kwargs dict for
:func:`submit_subband_pipeline`.
Returns:
Number of work units pushed.
"""
import redis as _redis
r = _redis.Redis.from_url(queue_config.result_backend_uri)
key = DYNAMIC_WORK_QUEUE.format(run_label=run_label)
for wu in work_units:
r.rpush(key, json.dumps(wu))
r.expire(key, 86400 * 7) # 7-day TTL
logger.info(f"Pushed {len(work_units)} work units to Redis key {key}")
return len(work_units)
def _pop_and_submit(run_label: str, queue: str):
"""Pop the next work unit from Redis and submit to *queue*.
Called during initial seeding (from the submission script) and from
:func:`_trigger_next_and_cleanup` on the worker after each hour
completes.
Returns:
Celery ``AsyncResult`` for the submitted chord, or *None*
if the queue is empty.
"""
import redis as _redis
r = _redis.Redis.from_url(queue_config.result_backend_uri)
key = DYNAMIC_WORK_QUEUE.format(run_label=run_label)
raw = r.lpop(key)
if not raw:
remaining = r.llen(key)
logger.info(
f"Dynamic dispatch: queue empty for {run_label} — node idle"
)
return None
work = json.loads(raw)
remaining = r.llen(key)
logger.info(
f"Dynamic dispatch: popped {work.get('subband')} "
f"{work.get('lst_label')} → queue {queue} "
f"({remaining} remaining in queue)"
)
# Route to the specified node and propagate dynamic mode
work['queue_override'] = queue
work['dynamic_run_label'] = run_label
return submit_subband_pipeline(remaining_hours=None, **work)
@app.task(
bind=True,
name='orca.tasks.subband_tasks.on_chord_error',
acks_late=True,
)
[docs]
def on_chord_error(
self,
task_id=None,
exc=None,
traceback_str=None,
dynamic_run_label: Optional[str] = None,
work_dir: Optional[str] = None,
cleanup_nvme: bool = False,
subband: str = '',
lst_label: str = '',
):
"""Error callback for Phase 1 task failures.
Attached via ``link_error`` on each individual Phase 1 task. When all
retries are exhausted for *any* MS in the chord, Celery fires this
callback. The chord itself may still complete (other MS tasks succeed)
and Phase 2 runs normally. But if enough tasks fail that the chord
never fires Phase 2, this ensures the dynamic dispatch chain continues.
A Redis ``SETNX`` lock (60 min TTL) prevents duplicate pops when
multiple Phase 1 tasks in the same chord all fail.
"""
import redis as _redis
node = socket.gethostname()
logger.error(
f"Phase 1 task {task_id} FAILED for {subband} {lst_label} on "
f"{node}: {exc}"
)
if not dynamic_run_label:
return
# Dedup: only one error callback per (subband, lst_label) should pop.
# Use a Redis lock with 60-min TTL so it auto-expires.
try:
r = _redis.Redis.from_url(queue_config.result_backend_uri)
lock_key = f"pipeline:chord_error_lock:{dynamic_run_label}:{subband}:{lst_label}"
acquired = r.set(lock_key, "1", nx=True, ex=3600)
if not acquired:
logger.info(
f"Chord error handler already fired for {subband} {lst_label} "
f"— skipping duplicate pop"
)
return
except Exception as e:
logger.warning(f"Could not acquire dedup lock: {e} — popping anyway")
# Wait briefly: the chord may still succeed if other MS tasks complete.
# Phase 2's finally block would then handle dispatch normally.
# 90s is enough for Phase 2 to start if it's going to.
import time as _time
_time.sleep(90)
# Check if Phase 2 already ran by seeing if the work_dir was cleaned
# or if archive products exist. If so, dispatch already happened.
archive_marker = os.path.join(
LUSTRE_ARCHIVE_DIR, lst_label, '*', '*', subband,
)
if work_dir and not os.path.isdir(work_dir) and glob.glob(archive_marker):
logger.info(
f"Phase 2 appears to have run for {subband} {lst_label} "
f"— skipping error-handler dispatch"
)
return
logger.info(
f"Phase 2 did not fire for {subband} {lst_label} — "
f"error handler triggering next work unit"
)
_trigger_next_and_cleanup(
remaining_hours=None,
work_dir=work_dir or '',
cleanup_nvme=cleanup_nvme,
subband=subband,
lst_label=lst_label,
dynamic_run_label=dynamic_run_label,
)
def _trigger_next_and_cleanup(
remaining_hours, work_dir, cleanup_nvme, subband, lst_label,
dynamic_run_label=None,
):
"""Trigger the next hour in the sequential chain and optionally clean NVMe.
Called from the ``finally`` block of ``process_subband_task`` so that the
chain **always** continues even when the current hour fails.
Also called from :func:`on_chord_error` when Phase 1 fails entirely
(all retries exhausted), ensuring the node picks up the next work unit
instead of going idle.
In **dynamic mode** (``dynamic_run_label`` is set), the next work unit is
popped from the Redis work queue instead of using ``remaining_hours``.
This allows any subband/hour to be dispatched to whichever node finishes
first.
"""
if dynamic_run_label:
# Dynamic mode: pop next work unit from Redis
queue = socket.gethostname().replace('lwa', '')
try:
_pop_and_submit(dynamic_run_label, queue)
except Exception as e:
logger.error(f"Dynamic dispatch failed: {e}")
traceback.print_exc()
elif remaining_hours:
# Static chaining mode
next_hour = remaining_hours[0]
rest = remaining_hours[1:] or None
next_label = next_hour.get('lst_label', '?')
logger.info(
f"Chain → submitting next hour {next_label} for {subband} "
f"({len(remaining_hours) - 1} hours remaining after)"
)
try:
submit_subband_pipeline(remaining_hours=rest, **next_hour)
except Exception as e:
logger.error(f"Failed to trigger next hour {next_label}: {e}")
traceback.print_exc()
# NVMe cleanup (best-effort)
if cleanup_nvme and os.path.isdir(work_dir):
try:
shutil.rmtree(work_dir)
logger.info(f"Cleaned up NVMe work_dir: {work_dir}")
except Exception:
pass
def _generate_local_movies(work_dir: str, freq_str: str) -> None:
"""Generate raw + filtered MP4 movies from pilot snapshot FITS images.
Produces up to three movies inside ``<work_dir>/Movies/``:
* ``<freq>_I_Raw.mp4`` — Stokes I time-lapse (grayscale)
* ``<freq>_V_Raw.mp4`` — Stokes V time-lapse (grayscale)
* ``<freq>_I_Filtered.mp4`` — Stokes I median-subtracted (highlights transients)
Uses ``matplotlib.animation`` + ``ffmpeg``. If either is unavailable the
step is silently skipped.
Args:
work_dir: NVMe working directory (must contain a ``snapshots/`` subfolder).
freq_str: Frequency label, e.g. ``'73MHz'``.
"""
if animation is None:
logger.warning("matplotlib.animation not available — skipping movie generation.")
return
logger.info("Generating movies from pilot snapshots...")
movie_dir = os.path.join(work_dir, "Movies")
os.makedirs(movie_dir, exist_ok=True)
snap_dir = os.path.join(work_dir, "snapshots")
for pol in ['I', 'V']:
files = sorted(glob.glob(os.path.join(snap_dir, f"*{pol}-image*.fits")))
if len(files) < 10:
logger.info(f"Only {len(files)} {pol} snapshot frames — skipping movie.")
continue
frames = []
for f in files[:150]:
try:
frames.append(fits.getdata(f).squeeze())
except Exception:
pass
if not frames:
continue
try:
cube = np.array(frames)
mid = len(cube) // 2
# 1. Raw movie (both I and V) — grayscale
if pol == 'V':
rms = np.nanstd(cube[mid])
vmin, vmax = -5 * rms, 5 * rms
else:
vmin, vmax = np.nanpercentile(cube[mid], [1, 99.5])
fig = plt.figure(figsize=(8, 8))
ax = fig.add_axes([0, 0, 1, 1])
ax.axis('off')
ims = [[ax.imshow(fr, animated=True, origin='lower', cmap='gray',
vmin=vmin, vmax=vmax)] for fr in cube]
ani = animation.ArtistAnimation(fig, ims, interval=100, blit=True)
raw_path = os.path.join(movie_dir, f"{freq_str}_{pol}_Raw.mp4")
ani.save(raw_path, writer='ffmpeg', dpi=150)
plt.close(fig)
logger.info(f"Saved {raw_path}")
# 2. Filtered movie (Stokes I only — median subtraction)
if pol == 'I':
med = np.median(cube, axis=0)
diff = cube - med
rms = np.std(diff)
fig = plt.figure(figsize=(8, 8))
ax = fig.add_axes([0, 0, 1, 1])
ax.axis('off')
ims = [[ax.imshow(fr, animated=True, origin='lower', cmap='gray',
vmin=-3 * rms, vmax=5 * rms)] for fr in diff]
ani = animation.ArtistAnimation(fig, ims, interval=100, blit=True)
filt_path = os.path.join(movie_dir, f"{freq_str}_{pol}_Filtered.mp4")
ani.save(filt_path, writer='ffmpeg', dpi=150)
plt.close(fig)
logger.info(f"Saved {filt_path}")
except Exception as e:
logger.error(f"Movie generation failed for {pol}: {e}")
traceback.print_exc()
def _cleanup_psf_files(work_dir: str) -> int:
"""Delete PSF FITS files from snapshots/, keeping only the first one.
WSClean produces a ``*-psf.fits`` for every snapshot interval. These are
large and rarely needed after QA — keep just the first as a reference and
remove the rest to save space before archiving.
Returns:
Number of PSF files removed.
"""
snap_dir = os.path.join(work_dir, "snapshots")
psf_files = sorted(glob.glob(os.path.join(snap_dir, "*-psf.fits")))
if len(psf_files) <= 1:
return 0
removed = 0
for psf in psf_files[1:]:
try:
os.remove(psf)
removed += 1
except OSError:
pass
logger.info(
f"PSF cleanup: kept {os.path.basename(psf_files[0])}, "
f"removed {removed}/{len(psf_files) - 1}"
)
return removed
def _compress_snapshot_fits(work_dir: str) -> int:
"""Compress all FITS files in ``snapshots/`` using fpack.
Each ``*.fits`` file is compressed to ``*.fits.fz`` by fpack, then
renamed to ``*.fits.fs``. The original uncompressed FITS is deleted.
Only snapshot images are compressed — deep images in ``I/`` and ``V/``
are left as-is.
Looks for ``fpack`` in this order:
1. ``$FPACK_BIN`` environment variable
2. Direct ``fpack`` on ``$PATH``
3. ``conda run -n development fpack`` (fallback)
Returns:
Number of files successfully compressed.
"""
return _compress_snapshot_fits_dir(os.path.join(work_dir, "snapshots"))
def _compress_snapshot_fits_dir(snap_dir: str) -> int:
"""Compress all ``*.fits`` in *snap_dir* via fpack → ``.fits.fs``.
Returns:
Number of files successfully compressed.
"""
# Resolve fpack binary / command prefix
fpack_env = os.environ.get('FPACK_BIN')
fpack_direct = shutil.which('fpack')
conda_bin = shutil.which('conda')
if fpack_env and os.path.isfile(fpack_env):
fpack_cmd = [fpack_env]
elif fpack_direct:
fpack_cmd = [fpack_direct]
elif conda_bin:
# Fall back to running fpack inside the 'development' conda env
fpack_cmd = [conda_bin, 'run', '-n', 'development', 'fpack']
logger.info("Using fpack via 'conda run -n development'")
else:
logger.warning("fpack not found (PATH, $FPACK_BIN, or conda development env) — skipping compression.")
return 0
fits_files = sorted(glob.glob(os.path.join(snap_dir, "*.fits")))
if not fits_files:
return 0
compressed = 0
for fpath in fits_files:
fz_path = fpath + ".fz"
fs_path = fpath + ".fs"
try:
subprocess.run(
fpack_cmd + ["-v", fpath],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True,
)
if os.path.exists(fz_path):
os.rename(fz_path, fs_path)
os.remove(fpath)
compressed += 1
else:
logger.warning(f"fpack did not produce {fz_path}")
except subprocess.CalledProcessError as e:
logger.error(f"fpack failed on {os.path.basename(fpath)}: {e}")
except OSError as e:
logger.error(f"Compress file error {os.path.basename(fpath)}: {e}")
logger.info(f"Compressed {compressed}/{len(fits_files)} snapshot FITS → .fs")
return compressed
def _patch_size_args(args: list, npix: int) -> list:
"""Return a copy of *args* with ``-size W H`` replaced by *npix npix*."""
args = list(args) # don't mutate the config
try:
idx = args.index('-size')
args[idx + 1] = str(npix)
args[idx + 2] = str(npix)
except (ValueError, IndexError):
pass
return args
def _patch_scale_arg(args: list, scale: float) -> list:
"""Return a copy of *args* with ``-scale V`` replaced by *scale*."""
args = list(args)
try:
idx = args.index('-scale')
args[idx + 1] = str(scale)
except (ValueError, IndexError):
pass
return args
# ============================================================================
# PHASE 1 — Per-MS task (runs in parallel via Celery)
# ============================================================================
@app.task(
bind=True,
name='orca.tasks.subband_tasks.prepare_one_ms_task',
autoretry_for=(Exception,),
retry_backoff=True,
retry_kwargs={'max_retries': 2},
acks_late=True,
)
[docs]
def prepare_one_ms_task(
self,
src_ms: str,
nvme_work_dir: str,
bp_table: str,
xy_table: str,
peel_sky: bool = False,
peel_rfi: bool = False,
) -> str:
"""Copy one MS to NVMe, flag, calibrate, and optionally peel.
This is the Phase 1 workhorse. Each MS file gets its own Celery task
so they run in parallel across the worker's concurrency slots.
Args:
src_ms: Source MS path on Lustre.
nvme_work_dir: NVMe working directory (same for all MS in a subband).
bp_table: Bandpass calibration table path.
xy_table: XY-phase calibration table path.
peel_sky: Run TTCal zest with sky model.
peel_rfi: Run TTCal zest with RFI model.
Returns:
Path to the processed MS on NVMe.
"""
node = socket.gethostname()
_p1_t0 = time.time()
logger.info(
f"[{self.request.id}] Phase 1 START on {node}: {os.path.basename(src_ms)}"
)
os.makedirs(nvme_work_dir, exist_ok=True)
redirect_casa_log(nvme_work_dir)
# 1. Copy to NVMe
_t = time.time()
nvme_ms = copy_ms_to_nvme(src_ms, nvme_work_dir)
logger.info(f"[TIMER] copy_to_nvme: {time.time() - _t:.1f}s")
# 2. Flag bad antennas (via mnc_python)
_t = time.time()
flag_bad_antennas(nvme_ms)
logger.info(f"[TIMER] flag_bad_antennas: {time.time() - _t:.1f}s")
# 3. Apply calibration (bandpass + XY-phase)
_t = time.time()
cal_ok = apply_calibration(nvme_ms, bp_table, xy_table)
logger.info(f"[TIMER] apply_calibration: {time.time() - _t:.1f}s")
if not cal_ok:
logger.error(f"Calibration failed for {os.path.basename(nvme_ms)}; removing")
shutil.rmtree(nvme_ms, ignore_errors=True)
raise RuntimeError(f"Calibration failed for {src_ms}")
# 4. Peeling
if peel_sky:
_t = time.time()
logger.info(f"Peeling sky model on {os.path.basename(nvme_ms)}")
zest_with_ttcal(
ms=nvme_ms,
sources=PEELING_PARAMS['sky_model'],
beam=PEELING_PARAMS['beam'],
minuvw=PEELING_PARAMS['minuvw'],
maxiter=PEELING_PARAMS['maxiter'],
tolerance=PEELING_PARAMS['tolerance'],
)
logger.info(f"[TIMER] peel_sky: {time.time() - _t:.1f}s")
if peel_rfi:
_t = time.time()
logger.info(f"Peeling RFI model on {os.path.basename(nvme_ms)}")
# RFI peeling may use a different conda env (ttcal_dev) than sky (julia060).
# The orca wrapper zest_with_ttcal uses julia060 hardcoded.
# If the envs are different, use shell-based invocation like the
# Slurm pipeline does.
rfi_env = PEELING_PARAMS.get('rfi_env', 'julia060')
sky_env = PEELING_PARAMS.get('sky_env', 'julia060')
if rfi_env != sky_env:
# Shell-based invocation matching process_subband.py
peel_env = os.environ.copy()
peel_env["OMP_NUM_THREADS"] = "8"
cmd = (
f"source ~/.bashrc && conda activate {rfi_env} && "
f"ttcal.jl zest {nvme_ms} {PEELING_PARAMS['rfi_model']} "
f"{PEELING_PARAMS['args']}"
)
import subprocess
subprocess.run(
cmd, shell=True, check=True,
executable='/bin/bash', env=peel_env,
)
else:
zest_with_ttcal(
ms=nvme_ms,
sources=PEELING_PARAMS['rfi_model'],
beam=PEELING_PARAMS['beam'],
minuvw=PEELING_PARAMS['minuvw'],
maxiter=PEELING_PARAMS['maxiter'],
tolerance=PEELING_PARAMS['tolerance'],
)
logger.info(f"[TIMER] peel_rfi: {time.time() - _t:.1f}s")
logger.info(
f"[{self.request.id}] Phase 1 DONE: {os.path.basename(nvme_ms)}"
)
logger.info(f"[TIMER] phase1_total: {time.time() - _p1_t0:.1f}s ({os.path.basename(nvme_ms)})")
return nvme_ms
# ============================================================================
# PHASE 2 — Per-subband task (runs after all Phase 1 tasks complete)
# ============================================================================
@app.task(
bind=True,
name='orca.tasks.subband_tasks.process_subband_task',
acks_late=True,
time_limit=28800, # 8 hours hard limit
soft_time_limit=27000, # 7h30m soft limit
)
[docs]
def process_subband_task(
self,
ms_paths: List[str],
work_dir: str,
subband: str,
lst_label: str,
obs_date: str,
run_label: str,
hot_baselines: bool = False,
skip_cleanup: bool = False,
cleanup_nvme: bool = False,
targets: Optional[List[str]] = None,
catalog: Optional[str] = None,
clean_snapshots: bool = False,
clean_reduced_pixels: bool = False,
reduced_pixels: bool = False,
skip_science: bool = False,
compress_snapshots: bool = False,
remaining_hours: Optional[List[dict]] = None,
dynamic_run_label: Optional[str] = None,
bp_table: Optional[str] = None,
xy_table: Optional[str] = None,
) -> str:
"""Phase 2: concatenate, image, run science, and archive one subband.
This task is used as the callback in a ``chord``: it receives the list
of NVMe MS paths returned by the Phase 1 tasks.
When *remaining_hours* is provided (by ``submit_subband_pipeline_chained``),
this task will submit the next hour's chord upon successful completion,
ensuring hours are processed **sequentially** on the same node.
When *dynamic_run_label* is set, the task operates in **dynamic dispatch**
mode: instead of chaining to the next hour of the same subband, it pops
the next (subband, hour) work unit from a Redis queue. This lets any
free node pick up whatever work is available.
Args:
ms_paths: NVMe paths returned by prepare_one_ms_task (via chord).
work_dir: NVMe working directory (same as Phase 1).
subband: Frequency label, e.g. '73MHz'.
lst_label: LST label, e.g. '14h'.
obs_date: Observation date string 'YYYY-MM-DD'.
run_label: Pipeline run label.
hot_baselines: Whether to run hot-baseline diagnostics.
skip_cleanup: If True, keep the concat MS on NVMe.
cleanup_nvme: If True, remove the entire NVMe work_dir after archiving.
targets: List of target-list file paths for photometry.
catalog: Path to BDSF catalog for transient search masking.
clean_snapshots: If True, produce CLEANed Stokes-I snapshots in
``snapshots_clean/`` in addition to the dirty pilots in
``snapshots/``. Always fpack-compressed.
clean_reduced_pixels: If True, scale clean snapshot pixel count
by subband frequency (1024/2048/4096). Only affects clean
snapshots, not dirty pilots or science imaging.
skip_science: If True, skip all science phases (dewarping, photometry,
transient search, flux check) after PB correction. Products
are still archived to Lustre.
compress_snapshots: If True, fpack-compress all snapshot FITS to .fs
and remove the originals. Deep images are not compressed.
remaining_hours: List of kwarg dicts for subsequent hours.
Each dict contains the arguments for ``submit_subband_pipeline``.
The first entry is submitted after this hour completes, with
the rest forwarded as its own ``remaining_hours``.
dynamic_run_label: If set, enables dynamic dispatch mode using
this run label as the Redis work-queue key.
Returns:
Path to the Lustre archive directory with final products.
"""
node = socket.gethostname()
_p2_t0 = time.time()
logger.info(
f"[{self.request.id}] Phase 2 START on {node}: "
f"{subband} ({len(ms_paths)} files)"
)
os.chdir(work_dir)
redirect_casa_log(work_dir)
# Filter out any Nones (from failed Phase 1 tasks that were retried and
# still returned nothing — shouldn't happen with autoretry, but be safe).
valid_ms = [p for p in ms_paths if p and os.path.isdir(p)]
# Check if a concat MS already exists from a previous (retried) attempt.
# On retry, individual MS files may have been cleaned up, but the concat
# MS survives — we can resume from it.
existing_concat = os.path.join(work_dir, f"{subband}_concat.ms")
have_concat = os.path.isdir(existing_concat)
if not valid_ms and not have_concat:
# No data at all — still trigger chain + cleanup before failing
logger.error(f"No valid MS files for {subband} in {lst_label}")
_trigger_next_and_cleanup(
remaining_hours, work_dir, cleanup_nvme, subband, lst_label,
dynamic_run_label=dynamic_run_label,
)
raise RuntimeError(f"No valid MS files for {subband}")
# Track whether processing succeeded for the summary log
_phase2_failed = False
archive_base = None
try:
for d in ['I/deep', 'V/deep', 'I/10min', 'V/10min', 'snapshots', 'QA',
'samples', 'detections', 'Dewarp_Diagnostics', 'Movies']:
os.makedirs(os.path.join(work_dir, d), exist_ok=True)
# ------------------------------------------------------------------
# 0. Write provenance metadata
# ------------------------------------------------------------------
try:
import orca as _orca
provenance = {
'pipeline_version': getattr(_orca, '__git_version__', 'unknown'),
'pipeline_branch': getattr(_orca, '__git_branch__', 'unknown'),
'package_version': getattr(_orca, '__version__', 'unknown'),
'task_id': self.request.id,
'worker_node': node,
'started_at': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
'subband': subband,
'obs_date': obs_date,
'lst_label': lst_label,
'run_label': run_label,
'dynamic_run_label': dynamic_run_label,
'bp_table': bp_table,
'xy_table': xy_table,
'n_ms_files': len(ms_paths),
'n_valid_ms': len([p for p in ms_paths if p and os.path.isdir(p)]),
'flags': {
'hot_baselines': hot_baselines,
'skip_cleanup': skip_cleanup,
'cleanup_nvme': cleanup_nvme,
'clean_snapshots': clean_snapshots,
'clean_reduced_pixels': clean_reduced_pixels,
'reduced_pixels': reduced_pixels,
'skip_science': skip_science,
'compress_snapshots': compress_snapshots,
},
'imaging': {
'pixel_size': get_pixel_size(subband) if reduced_pixels else 4096,
'clean_pixel_size': get_pixel_size(subband) if (clean_snapshots and clean_reduced_pixels) else (get_pixel_size(subband) if reduced_pixels else 4096),
'clean_pixel_scale': get_pixel_scale(subband) if (clean_snapshots and clean_reduced_pixels) else 0.03125,
'wsclean_j': get_image_resources(subband)[2],
'wsclean_bin': os.environ.get('WSCLEAN_BIN', '/opt/bin/wsclean'),
'snapshot_dirty': SNAPSHOT_PARAMS,
'snapshot_clean_i': {
'suffix': SNAPSHOT_CLEAN_I_PARAMS['suffix'],
'args': _patch_scale_arg(
_patch_size_args(
SNAPSHOT_CLEAN_I_PARAMS['args'],
get_pixel_size(subband) if clean_reduced_pixels else 4096,
),
get_pixel_scale(subband) if clean_reduced_pixels else 0.03125,
),
} if clean_snapshots else None,
'science_steps': [
{'suffix': s['suffix'], 'pol': s['pol'],
'category': s['category'], 'args': s['args']}
for s in IMAGING_STEPS
],
},
}
prov_path = os.path.join(work_dir, 'provenance.json')
with open(prov_path, 'w') as f:
json.dump(provenance, f, indent=2)
logger.info(f"Provenance written to {prov_path}")
except Exception as e:
logger.warning(f"Failed to write provenance.json: {e}")
# ------------------------------------------------------------------
# 1. Concatenation (skip if concat MS already exists from prior attempt)
# ------------------------------------------------------------------
_t = time.time()
if have_concat:
concat_ms = existing_concat
logger.info(f"Resuming from existing concat MS: {concat_ms}")
else:
logger.info("Concatenating MS files...")
concat_ms = concatenate_ms(valid_ms, work_dir, subband)
if not concat_ms:
raise RuntimeError("Concatenation failed")
# Clean up individual MS files (they are on NVMe, space is precious)
if not skip_cleanup:
for ms in valid_ms:
if os.path.exists(ms):
shutil.rmtree(ms)
logger.info(f"[TIMER] concatenation: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 2. Fix FIELD_ID
# ------------------------------------------------------------------
_t = time.time()
fix_field_id(concat_ms)
logger.info(f"[TIMER] fix_field_id: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 3. Change phase centre
# ------------------------------------------------------------------
_t = time.time()
hour_int = int(lst_label.replace('h', ''))
phase_center = f"{hour_int:02d}h30m00s 37d12m57.057s"
logger.info(f"Changing phase centre → {phase_center}")
change_phase_center(concat_ms, phase_center)
logger.info(f"[TIMER] chgcentre: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 4. AOFlagger
# ------------------------------------------------------------------
_t = time.time()
aoflagger_bin = os.environ.get('AOFLAGGER_BIN', '/opt/bin/aoflagger')
logger.info(f"Running AOFlagger with strategy {AOFLAGGER_STRATEGY}")
run_subprocess(
[aoflagger_bin, '-strategy', AOFLAGGER_STRATEGY, concat_ms],
"AOFlagger (Post-Concat)",
)
logger.info(f"[TIMER] aoflagger: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 5. Pilot snapshots + QA
# ------------------------------------------------------------------
_t = time.time()
try:
t = table(concat_ms, ack=False)
times = t.getcol("TIME")
n_ints = len(np.unique(times))
t.close()
except Exception:
n_ints = 357
pilot_name = f"{subband}-{SNAPSHOT_PARAMS['suffix']}"
pilot_path = os.path.join(work_dir, "snapshots", pilot_name)
wsclean_bin = os.environ.get('WSCLEAN_BIN', '/opt/bin/wsclean')
_, _, wsclean_j = get_image_resources(subband)
npix = get_pixel_size(subband) if reduced_pixels else 4096
logger.info(f"Pixel size for {subband}: {npix}x{npix} (reduced_pixels={reduced_pixels})")
cmd_pilot = (
[wsclean_bin]
+ ['-j', str(wsclean_j)]
+ _patch_size_args(SNAPSHOT_PARAMS['args'], npix)
+ ['-name', pilot_path, '-intervals-out', str(n_ints), concat_ms]
)
run_subprocess(cmd_pilot, "Pilot snapshot imaging")
add_timestamps_to_images(
os.path.join(work_dir, "snapshots"), pilot_name, concat_ms, n_ints,
)
pilot_v = sorted(glob.glob(
os.path.join(work_dir, "snapshots", f"{pilot_name}*-V-image*.fits")
))
bad_idx, stats = analyze_snapshot_quality(pilot_v)
plot_snapshot_diagnostics(stats, bad_idx, work_dir, subband)
if bad_idx:
flag_bad_integrations(concat_ms, bad_idx, n_ints)
logger.info(f"[TIMER] pilot_snapshots_qa: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 6. Hot baseline removal (optional)
# ------------------------------------------------------------------
if hot_baselines:
_t = time.time()
try:
_run_hot_baseline_diagnostics(concat_ms, work_dir)
except Exception as e:
logger.error(f"Hot baseline diagnostics failed: {e}")
traceback.print_exc()
logger.info(f"[TIMER] hot_baselines: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 6b. CLEANed Stokes-I snapshots (optional, in addition to dirty)
# ------------------------------------------------------------------
if clean_snapshots:
_t = time.time()
try:
clean_snap_dir = os.path.join(work_dir, "snapshots_clean")
os.makedirs(clean_snap_dir, exist_ok=True)
# Frequency-dependent pixel scaling for clean snapshots
npix_clean = get_pixel_size(subband) if clean_reduced_pixels else npix
scale_clean = get_pixel_scale(subband) if clean_reduced_pixels else 0.03125
logger.info(
f"Clean snapshot pixels for {subband}: {npix_clean}x{npix_clean}, "
f"scale={scale_clean} deg/px "
f"(clean_reduced_pixels={clean_reduced_pixels})"
)
clean_name = f"{subband}-{SNAPSHOT_CLEAN_I_PARAMS['suffix']}"
clean_path = os.path.join(clean_snap_dir, clean_name)
_clean_args = _patch_size_args(SNAPSHOT_CLEAN_I_PARAMS['args'], npix_clean)
_clean_args = _patch_scale_arg(_clean_args, scale_clean)
cmd_clean_snap = (
[wsclean_bin]
+ ['-j', str(wsclean_j)]
+ _clean_args
+ ['-name', clean_path,
'-intervals-out', str(n_ints), concat_ms]
)
run_subprocess(cmd_clean_snap, "Clean Stokes-I snapshot imaging")
add_timestamps_to_images(
clean_snap_dir, clean_name, concat_ms, n_ints,
)
# Remove PSF, model, and residual files (not needed after CLEAN)
for pattern in ['*-psf.fits', '*-model.fits', '*-residual.fits']:
for f in glob.glob(os.path.join(clean_snap_dir, pattern)):
try:
os.remove(f)
except OSError:
pass
# Always fpack-compress clean snapshots
_compress_snapshot_fits_dir(clean_snap_dir)
except Exception as e:
logger.error(f"Clean snapshot imaging failed: {e}")
traceback.print_exc()
logger.info(f"[TIMER] clean_snapshots: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 7. Science imaging + PB correction
# ------------------------------------------------------------------
_t_imaging_all = time.time()
logger.info(f"Starting Science Imaging for {subband}...")
logger.info(f"wsclean binary: {wsclean_bin}, thread limit: -j {wsclean_j}")
for step in IMAGING_STEPS:
_t_step = time.time()
target_dir = os.path.join(work_dir, step['pol'], step['category'])
base = f"{subband}-{step['suffix']}"
full_path = os.path.join(target_dir, base)
cmd = [wsclean_bin] + ['-j', str(wsclean_j)] + _patch_size_args(step['args'], npix) + ['-name', full_path]
if step.get('per_integration'):
n_out = n_ints
cmd += ['-intervals-out', str(n_ints)]
elif '-intervals-out' in step['args']:
idx = step['args'].index('-intervals-out')
n_out = int(step['args'][idx + 1])
else:
n_out = 1
cmd.append(concat_ms)
run_subprocess(cmd, f"Imaging {step['suffix']}")
add_timestamps_to_images(target_dir, base, concat_ms, n_out)
# Apply Primary Beam Correction to all images from this step
pb_count = apply_pb_correction_to_images(target_dir, base)
if pb_count > 0:
logger.info(f"PB corrected {pb_count} images for {step['suffix']}")
logger.info(f"[TIMER] imaging_{step['suffix']}: {time.time() - _t_step:.1f}s")
logger.info(f"[TIMER] imaging_all: {time.time() - _t_imaging_all:.1f}s")
# ------------------------------------------------------------------
# 7a. Movie generation from pilot snapshots
# ------------------------------------------------------------------
_t = time.time()
try:
_generate_local_movies(work_dir, subband)
except Exception as e:
logger.error(f"Movie generation failed: {e}")
traceback.print_exc()
logger.info(f"[TIMER] movie_generation: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 7b-pre. Lightweight image QA (runs ALWAYS, even with --skip_science)
# ------------------------------------------------------------------
try:
freq_mhz = float(subband.replace('MHz', ''))
except Exception:
freq_mhz = 50.0
# --- i. Per-subband noise RMS (Stokes V deep + Stokes I deep) ---
_t = time.time()
try:
from orca.transform.post_process_science import get_inner_rms
v_deep_dir = os.path.join(work_dir, "V", "deep")
i_deep_dir = os.path.join(work_dir, "I", "deep")
# Stokes V: raw image (not pbcorr)
v_candidates = sorted(glob.glob(
os.path.join(v_deep_dir, f"*V-Taper-Deep*image*.fits")))
v_candidates = [f for f in v_candidates
if "pbcorr" not in f and "dewarped" not in f]
v_rms = float(get_inner_rms(v_candidates[0])) if v_candidates else None
# Stokes I: pbcorr preferred, raw fallback
i_candidates = sorted(glob.glob(
os.path.join(i_deep_dir, f"*I-Deep-Taper-Robust-0.75*pbcorr*.fits")))
i_candidates = [f for f in i_candidates if "dewarped" not in f]
if not i_candidates:
i_candidates = sorted(glob.glob(
os.path.join(i_deep_dir, f"*I-Deep-Taper-Robust-0.75*image*.fits")))
i_candidates = [f for f in i_candidates
if "pbcorr" not in f and "dewarped" not in f]
i_rms = float(get_inner_rms(i_candidates[0])) if i_candidates else None
# Write CSV (append-friendly: one row per subband-hour)
import csv
from datetime import datetime as _dt
qa_csv = os.path.join(work_dir, "QA", "image_noise.csv")
write_header = not os.path.exists(qa_csv)
with open(qa_csv, "a", newline="") as fh:
writer = csv.writer(fh)
if write_header:
writer.writerow([
"subband", "freq_mhz", "lst_label",
"v_deep_rms", "i_deep_rms", "timestamp",
])
writer.writerow([
subband, freq_mhz, lst_label,
f"{v_rms:.6e}" if v_rms else "",
f"{i_rms:.6e}" if i_rms else "",
_dt.utcnow().isoformat(),
])
logger.info(
f"Image noise QA: V_rms={v_rms:.4e}, I_rms={i_rms:.4e}"
if v_rms and i_rms else
f"Image noise QA: V_rms={v_rms}, I_rms={i_rms}"
)
except Exception as e:
logger.warning(f"Image noise QA failed (non-fatal): {e}")
logger.info(f"[TIMER] image_noise_qa: {time.time() - _t:.1f}s")
# --- ii. Flux scale check (runs on PB-corrected images, no dewarping needed) ---
_t = time.time()
try:
from orca.transform.flux_check_cutout import run_flux_check
run_flux_check(work_dir, logger=logger)
except ImportError as e:
logger.warning(f"flux_check_cutout not available — skipping: {e}")
except Exception as e:
logger.warning(f"Flux check failed (non-fatal): {e}")
logger.info(f"[TIMER] image_flux_check_qa: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 7b. SCIENCE PHASES (all on NVMe)
# ------------------------------------------------------------------
if skip_science:
logger.info("--skip_science: skipping dewarping, photometry, transients, flux check")
# --- A. Ionospheric Dewarping (VLSSr cross-match) ---
_t = time.time()
if not skip_science:
logger.info("--- Science A: Ionospheric Dewarping (VLSSr) ---")
try:
from orca.transform.ionospheric_dewarping import (
load_ref_catalog, generate_warp_screens, apply_warp,
)
from astropy.wcs import WCS as _WCS
vlssr = load_ref_catalog(VLSSR_CATALOG, "VLSSr")
# Find all PB-corrected AND raw images to dewarp
files_to_warp = glob.glob(
os.path.join(work_dir, "*", "*", "*pbcorr*.fits"))
files_to_warp = [f for f in files_to_warp
if "_dewarped" not in f]
raw_images = glob.glob(
os.path.join(work_dir, "*", "*", "*image*.fits"))
raw_images = [f for f in raw_images
if "pbcorr" not in f and "_dewarped" not in f]
files_to_warp.extend(raw_images)
calc_img = find_deep_image(work_dir, freq_mhz, 'I')
if calc_img and vlssr:
df = extract_sources_to_df(calc_img)
if not df.empty:
with fits.open(calc_img) as h:
wcs_calc = _WCS(h[0].header).celestial
calc_shape = h[0].data.squeeze().shape
bmaj_deg = h[0].header.get('BMAJ', 5.0 / 60.0)
diag_dir = os.path.join(work_dir, "Dewarp_Diagnostics")
os.makedirs(diag_dir, exist_ok=True)
warp_base = os.path.join(diag_dir, f"{subband}_warp")
prev_cwd = os.getcwd()
os.chdir(diag_dir)
try:
sx, sy, _, _ = generate_warp_screens(
df, vlssr, wcs_calc, calc_shape,
freq_mhz, 74.0,
bmaj_deg, 5.0, base_name=warp_base,
)
finally:
os.chdir(prev_cwd)
if sx is not None:
n_warped = 0
for f in files_to_warp:
out = f.replace('.fits', '_dewarped.fits')
if os.path.exists(out):
continue
try:
with fits.open(f) as hf:
fdata = hf[0].data.squeeze()
if fdata.shape == sx.shape:
warped = apply_warp(fdata, sx, sy)
if warped is not None:
fits.writeto(
out, warped, hf[0].header,
overwrite=True)
n_warped += 1
except Exception:
pass
logger.info(f"Dewarped {n_warped}/{len(files_to_warp)} images")
else:
logger.warning("Warp screen generation failed — skipping dewarping.")
else:
logger.warning("No sources extracted for dewarping.")
else:
logger.warning("No deep I image or VLSSr catalog — skipping dewarping.")
except ImportError as e:
logger.warning(f"Dewarping modules not available — skipping: {e}")
except Exception as e:
logger.error(f"Dewarping failed: {e}")
traceback.print_exc()
logger.info(f"[TIMER] science_dewarping: {time.time() - _t:.1f}s")
# --- B. Target Photometry ---
_t = time.time()
if not skip_science:
logger.info("--- Science B: Target Photometry ---")
if targets:
try:
from orca.transform.cutout import (
load_targets as _load_targets,
process_target as _process_target,
)
local_samples = os.path.join(work_dir, "samples")
local_detects = os.path.join(work_dir, "detections")
os.makedirs(local_samples, exist_ok=True)
os.makedirs(local_detects, exist_ok=True)
for t_file in targets:
if not os.path.exists(t_file):
logger.warning(f"Target file not found: {t_file}")
continue
logger.info(f"Processing target file: {t_file}")
try:
s_name = os.path.splitext(os.path.basename(t_file))[0]
target_list = _load_targets(t_file)
logger.info(
f" Loaded {len(target_list)} targets from "
f"{os.path.basename(t_file)}")
for nm, crd, det_stokes, confusing_sources in target_list:
try:
_process_target(
work_dir, nm, crd, s_name,
local_samples, local_detects,
fallback_dir=work_dir,
detection_stokes=det_stokes,
confusing_sources=confusing_sources,
)
except Exception as e:
logger.error(f" Target '{nm}' failed: {e}")
except Exception as e:
logger.error(f"Failed to process target file {t_file}: {e}")
traceback.print_exc()
except ImportError as e:
logger.warning(f"cutout module not available — skipping target photometry: {e}")
else:
logger.info("No target files specified — skipping target photometry.")
logger.info(f"[TIMER] science_target_photometry: {time.time() - _t:.1f}s")
# --- B2. Solar System Body Photometry ---
_t = time.time()
if not skip_science:
logger.info("--- Science B2: Solar System Photometry ---")
try:
from orca.transform.solar_system_cutout import process_solar_system
local_samples = os.path.join(work_dir, "samples")
local_detects = os.path.join(work_dir, "detections")
os.makedirs(local_samples, exist_ok=True)
os.makedirs(local_detects, exist_ok=True)
process_solar_system(
work_dir, local_samples, local_detects,
fallback_dir=work_dir, logger=logger,
)
except ImportError as e:
logger.warning(f"solar_system_cutout not available — skipping: {e}")
except Exception as e:
logger.error(f"Solar system photometry failed: {e}")
traceback.print_exc()
logger.info(f"[TIMER] science_solar_system: {time.time() - _t:.1f}s")
# --- C. Transient Search ---
_t = time.time()
if not skip_science:
logger.info("--- Science C: Transient Search ---")
if catalog:
try:
from orca.transform.transient_search import run_test as _run_test
local_transient_detections = os.path.join(
work_dir, "detections")
os.makedirs(local_transient_detections, exist_ok=True)
def _find_transient_images(pol, category, suffix_filter=None):
"""Find tapered, optionally dewarped, non-pbcorr images."""
pat = os.path.join(
work_dir, pol, category, "*Taper*_dewarped.fits")
imgs = [f for f in glob.glob(pat)
if "pbcorr" not in f
and "_dewarped_dewarped" not in f]
if not imgs:
pat = os.path.join(
work_dir, pol, category, "*Taper*image*.fits")
imgs = [f for f in glob.glob(pat)
if "pbcorr" not in f and "dewarped" not in f]
if suffix_filter:
filtered = [f for f in imgs
if suffix_filter in os.path.basename(f)
and "NoTaper" not in os.path.basename(f)]
if filtered:
imgs = filtered
return imgs
# Deep I reference (Robust-0) for masking + subtraction
ref_i_imgs = _find_transient_images(
"I", "deep", suffix_filter="Robust-0-")
ref_i_path = ref_i_imgs[0] if ref_i_imgs else None
if ref_i_path:
logger.info(
f"Deep I reference: {os.path.basename(ref_i_path)}")
# Stokes V: blind search (no subtraction)
logger.info("Running Stokes V Blind Search...")
v_deep = _find_transient_images("V", "deep")
v_10min = _find_transient_images("V", "10min")
v_detections = []
for v_img in v_deep + v_10min:
try:
result = _run_test(
None, v_img, ref_i_path, catalog,
output_dir=local_transient_detections)
if result:
v_detections.extend(
result if isinstance(result, list) else [result])
except Exception as e:
logger.error(
f"V transient search failed on "
f"{os.path.basename(v_img)}: {e}")
# Stokes I: subtract deep from 10min snapshots
logger.info("Running Stokes I Subtraction Search...")
i_snaps = _find_transient_images("I", "10min")
i_detections = []
if ref_i_path:
for i_img in i_snaps:
try:
result = _run_test(
ref_i_path, i_img, ref_i_path, catalog,
output_dir=local_transient_detections)
if result:
i_detections.extend(
result if isinstance(result, list)
else [result])
except Exception as e:
logger.error(
f"I transient search failed on "
f"{os.path.basename(i_img)}: {e}")
total_det = len(v_detections) + len(i_detections)
logger.info(
f"Transient candidates: {len(v_detections)} Stokes V, "
f"{len(i_detections)} Stokes I")
if total_det > 10:
logger.warning(
f"QUALITY FLAG: {total_det} candidates — "
f"data quality may be poor.")
except ImportError as e:
logger.warning(
f"transient_search not available — skipping: {e}")
except Exception as e:
logger.error(f"Transient search failed: {e}")
traceback.print_exc()
else:
logger.info("No catalog specified — skipping transient search.")
logger.info(f"[TIMER] science_transient_search: {time.time() - _t:.1f}s")
# --- D. Flux Scale Check ---
# NOTE: flux check now runs unconditionally in step 7b-pre (above),
# so it no longer needs the skip_science guard. The timer tag is
# kept for backwards compatibility with log parsers.
logger.info(f"[TIMER] science_flux_check: 0.0s")
# ------------------------------------------------------------------
# 7c. Snapshot cleanup + optional compression
# ------------------------------------------------------------------
_t = time.time()
_cleanup_psf_files(work_dir)
if compress_snapshots:
_compress_snapshot_fits(work_dir)
logger.info(f"[TIMER] snapshot_cleanup_compress: {time.time() - _t:.1f}s")
# ------------------------------------------------------------------
# 8. Archive to Lustre
# ------------------------------------------------------------------
_t = time.time()
archive_base = os.path.join(
LUSTRE_ARCHIVE_DIR,
lst_label, obs_date, run_label, subband,
)
archive_results(
work_dir, archive_base,
subband=subband,
cleanup_concat=not skip_cleanup,
cleanup_workdir=cleanup_nvme,
)
logger.info(f"[TIMER] archive_to_lustre: {time.time() - _t:.1f}s")
except Exception as exc:
_phase2_failed = True
logger.error(
f"Phase 2 FAILED for {subband} {lst_label}: {exc}"
)
traceback.print_exc()
# Best-effort archive of whatever products exist so far
try:
archive_base = os.path.join(
LUSTRE_ARCHIVE_DIR,
lst_label, obs_date, run_label, subband,
)
archive_results(
work_dir, archive_base,
subband=subband,
cleanup_concat=not skip_cleanup,
cleanup_workdir=False,
)
logger.info(f"Partial archive saved to {archive_base}")
except Exception as archive_exc:
logger.error(f"Partial archive also failed: {archive_exc}")
finally:
# ------------------------------------------------------------------
# 9. Trigger next hour + cleanup — ALWAYS runs
# ------------------------------------------------------------------
_trigger_next_and_cleanup(
remaining_hours, work_dir, cleanup_nvme, subband, lst_label,
dynamic_run_label=dynamic_run_label,
)
logger.info(f"[TIMER] phase2_total: {time.time() - _p2_t0:.1f}s")
if _phase2_failed:
logger.error(
f"[{self.request.id}] Phase 2 FAILED: {subband} {lst_label} "
f"(chain continues)"
)
else:
logger.info(
f"[{self.request.id}] Phase 2 COMPLETE: "
f"{subband} → {archive_base}"
)
return archive_base or ''
# ============================================================================
# Convenience: submit an entire subband as a chord
# ============================================================================
[docs]
def submit_subband_pipeline(
ms_files: List[str],
subband: str,
bp_table: str,
xy_table: str,
lst_label: str,
obs_date: str,
run_label: str,
peel_sky: bool = False,
peel_rfi: bool = False,
hot_baselines: bool = False,
skip_cleanup: bool = False,
cleanup_nvme: bool = False,
nvme_work_dir: Optional[str] = None,
queue_override: Optional[str] = None,
targets: Optional[List[str]] = None,
catalog: Optional[str] = None,
clean_snapshots: bool = False,
clean_reduced_pixels: bool = False,
reduced_pixels: bool = False,
skip_science: bool = False,
compress_snapshots: bool = False,
remaining_hours: Optional[List[dict]] = None,
dynamic_run_label: Optional[str] = None,
) -> 'celery.result.AsyncResult':
"""Submit the full two-phase subband pipeline as a Celery chord.
This is the main entry point for the submission script.
Args:
ms_files: List of source MS paths on Lustre.
subband: Frequency label, e.g. '73MHz'.
bp_table: Bandpass calibration table.
xy_table: XY-phase calibration table.
lst_label: LST label, e.g. '14h'.
obs_date: Observation date 'YYYY-MM-DD'.
run_label: Human-readable run identifier.
peel_sky: Peel astronomical sky sources.
peel_rfi: Peel RFI sources.
hot_baselines: Run hot-baseline diagnostics.
skip_cleanup: Keep intermediate files on NVMe.
cleanup_nvme: Remove entire NVMe work_dir after archiving to Lustre.
nvme_work_dir: Override NVMe work directory.
queue_override: Force routing to this queue instead of the
default node. E.g. 'calim08' to run 18MHz on calim08.
targets: List of target-list file paths for photometry.
catalog: Path to BDSF catalog for transient search masking.
clean_snapshots: If True, produce CLEANed Stokes-I snapshots.
clean_reduced_pixels: Scale clean snapshot pixels by frequency.
reduced_pixels: If True, scale pixel count by subband frequency.
skip_science: If True, skip science phases after PB correction.
compress_snapshots: If True, fpack-compress snapshot FITS.
dynamic_run_label: If set, enables dynamic dispatch mode.
Returns:
Celery AsyncResult for the chord (Phase 2 result).
"""
queue = queue_override or get_queue_for_subband(subband)
if nvme_work_dir is None:
nvme_work_dir = os.path.join(
NVME_BASE_DIR, lst_label, obs_date, run_label, subband,
)
# Phase 1: one task per MS, all routed to the same node queue
phase1_tasks = [
prepare_one_ms_task.s(
src_ms=ms,
nvme_work_dir=nvme_work_dir,
bp_table=bp_table,
xy_table=xy_table,
peel_sky=peel_sky,
peel_rfi=peel_rfi,
).set(queue=queue)
for ms in ms_files
]
# Phase 2: runs after all Phase 1 tasks complete
phase2_callback = process_subband_task.s(
work_dir=nvme_work_dir,
subband=subband,
lst_label=lst_label,
obs_date=obs_date,
run_label=run_label,
hot_baselines=hot_baselines,
skip_cleanup=skip_cleanup,
cleanup_nvme=cleanup_nvme,
targets=targets,
catalog=catalog,
clean_snapshots=clean_snapshots,
clean_reduced_pixels=clean_reduced_pixels,
reduced_pixels=reduced_pixels,
skip_science=skip_science,
compress_snapshots=compress_snapshots,
remaining_hours=remaining_hours,
dynamic_run_label=dynamic_run_label,
bp_table=bp_table,
xy_table=xy_table,
).set(queue=queue)
# Error handler: if all Phase 1 retries fail the chord never fires
# Phase 2, so this callback ensures the dispatch chain continues.
# Attach link_error to each individual Phase 1 task (Celery 4.x
# compatible — cannot pass link_error to chord() directly).
error_sig = on_chord_error.si(
dynamic_run_label=dynamic_run_label,
work_dir=nvme_work_dir,
cleanup_nvme=cleanup_nvme,
subband=subband,
lst_label=lst_label,
).set(queue=queue)
for t in phase1_tasks:
t.link_error = [error_sig]
# chord(Phase1)(Phase2) — Phase2 receives list of Phase1 return values
pipeline = chord(phase1_tasks)(phase2_callback)
logger.info(
f"Submitted {subband} → queue={queue}: "
f"{len(ms_files)} MS files, work_dir={nvme_work_dir}"
)
return pipeline
# ============================================================================
# Internal helpers
# ============================================================================
def _run_hot_baseline_diagnostics(concat_ms: str, work_dir: str) -> None:
"""Run hot-baseline heatmap and UV diagnostics.
Runs the hot-baseline analysis using orca.transform.hot_baselines.
"""
from orca.resources.subband_config import HOT_BASELINE_PARAMS
try:
from orca.transform import hot_baselines
except ImportError:
logger.warning("orca.transform.hot_baselines not importable; skipping diagnostics")
return
class HotArgs:
ms = concat_ms
col = "CORRECTED_DATA"
uv_cut = 0.0
uv_cut_lambda = HOT_BASELINE_PARAMS['uv_cut_lambda']
sigma = HOT_BASELINE_PARAMS['heatmap_sigma']
uv_sigma = HOT_BASELINE_PARAMS['uv_sigma']
uv_window_size = HOT_BASELINE_PARAMS.get('uv_window_size', 100)
threshold = HOT_BASELINE_PARAMS['bad_antenna_threshold']
apply_antenna_flags = HOT_BASELINE_PARAMS['apply_flags']
apply_baseline_flags = HOT_BASELINE_PARAMS['apply_flags']
run_uv = HOT_BASELINE_PARAMS['run_uv_analysis']
run_heatmap = HOT_BASELINE_PARAMS['run_heatmap_analysis']
qa_dir = os.path.join(work_dir, "QA")
os.makedirs(qa_dir, exist_ok=True)
cwd = os.getcwd()
os.chdir(qa_dir)
try:
hot_baselines.run_diagnostics(HotArgs, logger)
finally:
os.chdir(cwd)
# ============================================================================
# Sequential chaining: process multiple hours one at a time per subband
# ============================================================================
[docs]
def submit_subband_pipeline_chained(
hour_specs: List[dict],
subband: str,
bp_table: str,
xy_table: str,
run_label: str,
peel_sky: bool = False,
peel_rfi: bool = False,
hot_baselines: bool = False,
skip_cleanup: bool = False,
cleanup_nvme: bool = False,
queue_override: Optional[str] = None,
targets: Optional[List[str]] = None,
catalog: Optional[str] = None,
clean_snapshots: bool = False,
clean_reduced_pixels: bool = False,
reduced_pixels: bool = False,
skip_science: bool = False,
compress_snapshots: bool = False,
) -> 'celery.result.AsyncResult':
"""Submit multiple LST-hours for one subband as a sequential chain.
Instead of submitting all hours simultaneously (which floods the worker
with Phase 1 tasks and starves Phase 2), this function submits only the
**first hour** immediately. Phase 2 of each hour triggers the next
hour's chord upon completion, ensuring:
- Phase 2 (imaging) runs on an idle node with full CPU/memory
- NVMe space is freed before the next hour's data arrives
- No resource contention between hours on the same node
Different subbands still run in **parallel** on different nodes.
Args:
hour_specs: List of dicts, each with keys:
- ``ms_files``: List of source MS paths for that hour.
- ``lst_label``: e.g. '14h'.
- ``obs_date``: e.g. '2025-06-15'.
subband: Frequency label, e.g. '73MHz'.
bp_table: Bandpass calibration table.
xy_table: XY-phase calibration table.
run_label: Human-readable run identifier.
peel_sky: Peel astronomical sky sources.
peel_rfi: Peel RFI sources.
hot_baselines: Run hot-baseline diagnostics.
skip_cleanup: Keep intermediate files on NVMe.
cleanup_nvme: Remove entire NVMe work_dir after archiving.
queue_override: Force routing to this queue.
targets: Target-list file paths for photometry.
catalog: BDSF catalog for transient search masking.
clean_snapshots: Produce CLEANed Stokes-I snapshots.
clean_reduced_pixels: Scale clean snapshot pixels by frequency.
reduced_pixels: Scale pixel count by subband frequency.
skip_science: Skip science phases after PB correction.
compress_snapshots: fpack-compress snapshot FITS.
Returns:
Celery AsyncResult for the first hour's chord (only the first
hour is submitted immediately; subsequent hours are triggered
by Phase 2 callbacks).
"""
if not hour_specs:
raise ValueError("hour_specs must not be empty")
# Build the kwargs dict for each hour's submit_subband_pipeline() call.
# Common params are the same; only ms_files/lst_label/obs_date vary.
all_hour_kwargs = []
for spec in hour_specs:
kwargs = dict(
ms_files=spec['ms_files'],
subband=subband,
bp_table=bp_table,
xy_table=xy_table,
lst_label=spec['lst_label'],
obs_date=spec['obs_date'],
run_label=run_label,
peel_sky=peel_sky,
peel_rfi=peel_rfi,
hot_baselines=hot_baselines,
skip_cleanup=skip_cleanup,
cleanup_nvme=cleanup_nvme,
queue_override=queue_override,
targets=targets,
catalog=catalog,
clean_snapshots=clean_snapshots,
clean_reduced_pixels=clean_reduced_pixels,
reduced_pixels=reduced_pixels,
skip_science=skip_science,
compress_snapshots=compress_snapshots,
)
all_hour_kwargs.append(kwargs)
# Submit only the first hour now; pass remaining hours through so
# Phase 2 can trigger the next one upon completion.
first_hour = all_hour_kwargs[0]
remaining = all_hour_kwargs[1:] or None
labels = [s['lst_label'] for s in hour_specs]
logger.info(
f"Chained submission for {subband}: "
f"{' → '.join(labels)} ({len(hour_specs)} hours)"
)
return submit_subband_pipeline(remaining_hours=remaining, **first_hour)