Source code for orca.tasks.subband_tasks

"""Celery tasks for subband processing on NVMe-local calim servers.

Architecture
------------
The processing is split into two phases so that Celery handles the
per-MS parallelism in Phase 1, while Phase 2 runs sequentially on the
same node after all individual MS files are ready.

Phase 1 — per-MS  (embarrassingly parallel, one Celery task per MS):
    copy to NVMe → flag bad antennas → apply calibration → peel (sky + RFI)

Phase 2 — per-subband  (sequential, runs after all Phase 1 tasks finish):
    concatenate → fix field ID → chgcentre → AOFlagger → pilot snapshots →
    snapshot QA → hot baseline removal → science imaging → archive to Lustre

Both phases are routed to the **same node-specific queue** (e.g. ``calim08``)
so that all I/O stays on the node's local NVMe.

Usage (from a submission script or notebook)::

    from celery import chord
    from orca.tasks.subband_tasks import prepare_one_ms_task, process_subband_task
    from orca.resources.subband_config import get_queue_for_subband

    queue = get_queue_for_subband('73MHz')

    # Phase 1: parallel per-MS
    header_tasks = [
        prepare_one_ms_task.s(
            src_ms=ms, nvme_work_dir=work_dir,
            bp_table=bp, xy_table=xy,
            peel_sky=True, peel_rfi=True,
        ) for ms in ms_files
    ]

    # Phase 2: runs after all Phase 1 tasks complete; receives list of MS paths
    pipeline = chord(header_tasks)(
        process_subband_task.s(
            work_dir=work_dir, subband='73MHz',
            lst_label='14h', obs_date='2025-06-15',
            run_label='Run_20250615',
        )
    )
"""
import os
import glob
import shutil
import json
import socket
import logging
import subprocess
import time
import traceback

# WSClean (linked against OpenBLAS) refuses to start if OpenBLAS multi-
# threading is enabled.  Setting this early ensures every subprocess inherits it.
os.environ.setdefault('OPENBLAS_NUM_THREADS', '1')
from typing import List, Optional

import numpy as np
from celery import chord
from casacore.tables import table
from astropy.io import fits

try:
    import matplotlib
    matplotlib.use('Agg')
    import matplotlib.pyplot as plt
    import matplotlib.animation as animation
except ImportError:
[docs] animation = None
from orca.celery import app from orca.wrapper.ttcal import zest_with_ttcal from orca.wrapper.change_phase_centre import change_phase_center from orca.transform.subband_processing import ( redirect_casa_log, copy_ms_to_nvme, apply_calibration, flag_bad_antennas, concatenate_ms, fix_field_id, analyze_snapshot_quality, flag_bad_integrations, add_timestamps_to_images, plot_snapshot_diagnostics, archive_results, run_subprocess, apply_pb_correction_to_images, find_deep_image, extract_sources_to_df, ) from orca.configmanager import queue_config from orca.resources.subband_config import ( PEELING_PARAMS, AOFLAGGER_STRATEGY, SNAPSHOT_PARAMS, SNAPSHOT_CLEAN_I_PARAMS, IMAGING_STEPS, get_pixel_size, get_pixel_scale, NVME_BASE_DIR, LUSTRE_ARCHIVE_DIR, VLSSR_CATALOG, get_queue_for_subband, get_image_resources, )
[docs] logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------- # Dynamic dispatch — Redis-backed work queue # ---------------------------------------------------------------------------
[docs] DYNAMIC_WORK_QUEUE = "pipeline:dynamic:{run_label}"
def _dynamic_queue_length(run_label: str) -> int: """Return the number of queued work units for a dynamic run label.""" import redis as _redis r = _redis.Redis.from_url(queue_config.result_backend_uri) key = DYNAMIC_WORK_QUEUE.format(run_label=run_label) return int(r.llen(key)) def _push_work_units(run_label: str, work_units: List[dict]) -> int: """Push work units to a Redis list for dynamic dispatch. Each work unit is a complete kwargs dict for :func:`submit_subband_pipeline`. Returns: Number of work units pushed. """ import redis as _redis r = _redis.Redis.from_url(queue_config.result_backend_uri) key = DYNAMIC_WORK_QUEUE.format(run_label=run_label) for wu in work_units: r.rpush(key, json.dumps(wu)) r.expire(key, 86400 * 7) # 7-day TTL logger.info(f"Pushed {len(work_units)} work units to Redis key {key}") return len(work_units) def _pop_and_submit(run_label: str, queue: str): """Pop the next work unit from Redis and submit to *queue*. Called during initial seeding (from the submission script) and from :func:`_trigger_next_and_cleanup` on the worker after each hour completes. Returns: Celery ``AsyncResult`` for the submitted chord, or *None* if the queue is empty. """ import redis as _redis r = _redis.Redis.from_url(queue_config.result_backend_uri) key = DYNAMIC_WORK_QUEUE.format(run_label=run_label) raw = r.lpop(key) if not raw: remaining = r.llen(key) logger.info( f"Dynamic dispatch: queue empty for {run_label} — node idle" ) return None work = json.loads(raw) remaining = r.llen(key) logger.info( f"Dynamic dispatch: popped {work.get('subband')} " f"{work.get('lst_label')} → queue {queue} " f"({remaining} remaining in queue)" ) # Route to the specified node and propagate dynamic mode work['queue_override'] = queue work['dynamic_run_label'] = run_label return submit_subband_pipeline(remaining_hours=None, **work) @app.task( bind=True, name='orca.tasks.subband_tasks.on_chord_error', acks_late=True, )
[docs] def on_chord_error( self, task_id=None, exc=None, traceback_str=None, dynamic_run_label: Optional[str] = None, work_dir: Optional[str] = None, cleanup_nvme: bool = False, subband: str = '', lst_label: str = '', ): """Error callback for Phase 1 task failures. Attached via ``link_error`` on each individual Phase 1 task. When all retries are exhausted for *any* MS in the chord, Celery fires this callback. The chord itself may still complete (other MS tasks succeed) and Phase 2 runs normally. But if enough tasks fail that the chord never fires Phase 2, this ensures the dynamic dispatch chain continues. A Redis ``SETNX`` lock (60 min TTL) prevents duplicate pops when multiple Phase 1 tasks in the same chord all fail. """ import redis as _redis node = socket.gethostname() logger.error( f"Phase 1 task {task_id} FAILED for {subband} {lst_label} on " f"{node}: {exc}" ) if not dynamic_run_label: return # Dedup: only one error callback per (subband, lst_label) should pop. # Use a Redis lock with 60-min TTL so it auto-expires. try: r = _redis.Redis.from_url(queue_config.result_backend_uri) lock_key = f"pipeline:chord_error_lock:{dynamic_run_label}:{subband}:{lst_label}" acquired = r.set(lock_key, "1", nx=True, ex=3600) if not acquired: logger.info( f"Chord error handler already fired for {subband} {lst_label} " f"— skipping duplicate pop" ) return except Exception as e: logger.warning(f"Could not acquire dedup lock: {e} — popping anyway") # Wait briefly: the chord may still succeed if other MS tasks complete. # Phase 2's finally block would then handle dispatch normally. # 90s is enough for Phase 2 to start if it's going to. import time as _time _time.sleep(90) # Check if Phase 2 already ran by seeing if the work_dir was cleaned # or if archive products exist. If so, dispatch already happened. archive_marker = os.path.join( LUSTRE_ARCHIVE_DIR, lst_label, '*', '*', subband, ) if work_dir and not os.path.isdir(work_dir) and glob.glob(archive_marker): logger.info( f"Phase 2 appears to have run for {subband} {lst_label} " f"— skipping error-handler dispatch" ) return logger.info( f"Phase 2 did not fire for {subband} {lst_label} — " f"error handler triggering next work unit" ) _trigger_next_and_cleanup( remaining_hours=None, work_dir=work_dir or '', cleanup_nvme=cleanup_nvme, subband=subband, lst_label=lst_label, dynamic_run_label=dynamic_run_label, )
def _trigger_next_and_cleanup( remaining_hours, work_dir, cleanup_nvme, subband, lst_label, dynamic_run_label=None, ): """Trigger the next hour in the sequential chain and optionally clean NVMe. Called from the ``finally`` block of ``process_subband_task`` so that the chain **always** continues even when the current hour fails. Also called from :func:`on_chord_error` when Phase 1 fails entirely (all retries exhausted), ensuring the node picks up the next work unit instead of going idle. In **dynamic mode** (``dynamic_run_label`` is set), the next work unit is popped from the Redis work queue instead of using ``remaining_hours``. This allows any subband/hour to be dispatched to whichever node finishes first. """ if dynamic_run_label: # Dynamic mode: pop next work unit from Redis queue = socket.gethostname().replace('lwa', '') try: _pop_and_submit(dynamic_run_label, queue) except Exception as e: logger.error(f"Dynamic dispatch failed: {e}") traceback.print_exc() elif remaining_hours: # Static chaining mode next_hour = remaining_hours[0] rest = remaining_hours[1:] or None next_label = next_hour.get('lst_label', '?') logger.info( f"Chain → submitting next hour {next_label} for {subband} " f"({len(remaining_hours) - 1} hours remaining after)" ) try: submit_subband_pipeline(remaining_hours=rest, **next_hour) except Exception as e: logger.error(f"Failed to trigger next hour {next_label}: {e}") traceback.print_exc() # NVMe cleanup (best-effort) if cleanup_nvme and os.path.isdir(work_dir): try: shutil.rmtree(work_dir) logger.info(f"Cleaned up NVMe work_dir: {work_dir}") except Exception: pass def _generate_local_movies(work_dir: str, freq_str: str) -> None: """Generate raw + filtered MP4 movies from pilot snapshot FITS images. Produces up to three movies inside ``<work_dir>/Movies/``: * ``<freq>_I_Raw.mp4`` — Stokes I time-lapse (grayscale) * ``<freq>_V_Raw.mp4`` — Stokes V time-lapse (grayscale) * ``<freq>_I_Filtered.mp4`` — Stokes I median-subtracted (highlights transients) Uses ``matplotlib.animation`` + ``ffmpeg``. If either is unavailable the step is silently skipped. Args: work_dir: NVMe working directory (must contain a ``snapshots/`` subfolder). freq_str: Frequency label, e.g. ``'73MHz'``. """ if animation is None: logger.warning("matplotlib.animation not available — skipping movie generation.") return logger.info("Generating movies from pilot snapshots...") movie_dir = os.path.join(work_dir, "Movies") os.makedirs(movie_dir, exist_ok=True) snap_dir = os.path.join(work_dir, "snapshots") for pol in ['I', 'V']: files = sorted(glob.glob(os.path.join(snap_dir, f"*{pol}-image*.fits"))) if len(files) < 10: logger.info(f"Only {len(files)} {pol} snapshot frames — skipping movie.") continue frames = [] for f in files[:150]: try: frames.append(fits.getdata(f).squeeze()) except Exception: pass if not frames: continue try: cube = np.array(frames) mid = len(cube) // 2 # 1. Raw movie (both I and V) — grayscale if pol == 'V': rms = np.nanstd(cube[mid]) vmin, vmax = -5 * rms, 5 * rms else: vmin, vmax = np.nanpercentile(cube[mid], [1, 99.5]) fig = plt.figure(figsize=(8, 8)) ax = fig.add_axes([0, 0, 1, 1]) ax.axis('off') ims = [[ax.imshow(fr, animated=True, origin='lower', cmap='gray', vmin=vmin, vmax=vmax)] for fr in cube] ani = animation.ArtistAnimation(fig, ims, interval=100, blit=True) raw_path = os.path.join(movie_dir, f"{freq_str}_{pol}_Raw.mp4") ani.save(raw_path, writer='ffmpeg', dpi=150) plt.close(fig) logger.info(f"Saved {raw_path}") # 2. Filtered movie (Stokes I only — median subtraction) if pol == 'I': med = np.median(cube, axis=0) diff = cube - med rms = np.std(diff) fig = plt.figure(figsize=(8, 8)) ax = fig.add_axes([0, 0, 1, 1]) ax.axis('off') ims = [[ax.imshow(fr, animated=True, origin='lower', cmap='gray', vmin=-3 * rms, vmax=5 * rms)] for fr in diff] ani = animation.ArtistAnimation(fig, ims, interval=100, blit=True) filt_path = os.path.join(movie_dir, f"{freq_str}_{pol}_Filtered.mp4") ani.save(filt_path, writer='ffmpeg', dpi=150) plt.close(fig) logger.info(f"Saved {filt_path}") except Exception as e: logger.error(f"Movie generation failed for {pol}: {e}") traceback.print_exc() def _cleanup_psf_files(work_dir: str) -> int: """Delete PSF FITS files from snapshots/, keeping only the first one. WSClean produces a ``*-psf.fits`` for every snapshot interval. These are large and rarely needed after QA — keep just the first as a reference and remove the rest to save space before archiving. Returns: Number of PSF files removed. """ snap_dir = os.path.join(work_dir, "snapshots") psf_files = sorted(glob.glob(os.path.join(snap_dir, "*-psf.fits"))) if len(psf_files) <= 1: return 0 removed = 0 for psf in psf_files[1:]: try: os.remove(psf) removed += 1 except OSError: pass logger.info( f"PSF cleanup: kept {os.path.basename(psf_files[0])}, " f"removed {removed}/{len(psf_files) - 1}" ) return removed def _compress_snapshot_fits(work_dir: str) -> int: """Compress all FITS files in ``snapshots/`` using fpack. Each ``*.fits`` file is compressed to ``*.fits.fz`` by fpack, then renamed to ``*.fits.fs``. The original uncompressed FITS is deleted. Only snapshot images are compressed — deep images in ``I/`` and ``V/`` are left as-is. Looks for ``fpack`` in this order: 1. ``$FPACK_BIN`` environment variable 2. Direct ``fpack`` on ``$PATH`` 3. ``conda run -n development fpack`` (fallback) Returns: Number of files successfully compressed. """ return _compress_snapshot_fits_dir(os.path.join(work_dir, "snapshots")) def _compress_snapshot_fits_dir(snap_dir: str) -> int: """Compress all ``*.fits`` in *snap_dir* via fpack → ``.fits.fs``. Returns: Number of files successfully compressed. """ # Resolve fpack binary / command prefix fpack_env = os.environ.get('FPACK_BIN') fpack_direct = shutil.which('fpack') conda_bin = shutil.which('conda') if fpack_env and os.path.isfile(fpack_env): fpack_cmd = [fpack_env] elif fpack_direct: fpack_cmd = [fpack_direct] elif conda_bin: # Fall back to running fpack inside the 'development' conda env fpack_cmd = [conda_bin, 'run', '-n', 'development', 'fpack'] logger.info("Using fpack via 'conda run -n development'") else: logger.warning("fpack not found (PATH, $FPACK_BIN, or conda development env) — skipping compression.") return 0 fits_files = sorted(glob.glob(os.path.join(snap_dir, "*.fits"))) if not fits_files: return 0 compressed = 0 for fpath in fits_files: fz_path = fpath + ".fz" fs_path = fpath + ".fs" try: subprocess.run( fpack_cmd + ["-v", fpath], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, ) if os.path.exists(fz_path): os.rename(fz_path, fs_path) os.remove(fpath) compressed += 1 else: logger.warning(f"fpack did not produce {fz_path}") except subprocess.CalledProcessError as e: logger.error(f"fpack failed on {os.path.basename(fpath)}: {e}") except OSError as e: logger.error(f"Compress file error {os.path.basename(fpath)}: {e}") logger.info(f"Compressed {compressed}/{len(fits_files)} snapshot FITS → .fs") return compressed def _patch_size_args(args: list, npix: int) -> list: """Return a copy of *args* with ``-size W H`` replaced by *npix npix*.""" args = list(args) # don't mutate the config try: idx = args.index('-size') args[idx + 1] = str(npix) args[idx + 2] = str(npix) except (ValueError, IndexError): pass return args def _patch_scale_arg(args: list, scale: float) -> list: """Return a copy of *args* with ``-scale V`` replaced by *scale*.""" args = list(args) try: idx = args.index('-scale') args[idx + 1] = str(scale) except (ValueError, IndexError): pass return args # ============================================================================ # PHASE 1 — Per-MS task (runs in parallel via Celery) # ============================================================================ @app.task( bind=True, name='orca.tasks.subband_tasks.prepare_one_ms_task', autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 2}, acks_late=True, )
[docs] def prepare_one_ms_task( self, src_ms: str, nvme_work_dir: str, bp_table: str, xy_table: str, peel_sky: bool = False, peel_rfi: bool = False, ) -> str: """Copy one MS to NVMe, flag, calibrate, and optionally peel. This is the Phase 1 workhorse. Each MS file gets its own Celery task so they run in parallel across the worker's concurrency slots. Args: src_ms: Source MS path on Lustre. nvme_work_dir: NVMe working directory (same for all MS in a subband). bp_table: Bandpass calibration table path. xy_table: XY-phase calibration table path. peel_sky: Run TTCal zest with sky model. peel_rfi: Run TTCal zest with RFI model. Returns: Path to the processed MS on NVMe. """ node = socket.gethostname() _p1_t0 = time.time() logger.info( f"[{self.request.id}] Phase 1 START on {node}: {os.path.basename(src_ms)}" ) os.makedirs(nvme_work_dir, exist_ok=True) redirect_casa_log(nvme_work_dir) # 1. Copy to NVMe _t = time.time() nvme_ms = copy_ms_to_nvme(src_ms, nvme_work_dir) logger.info(f"[TIMER] copy_to_nvme: {time.time() - _t:.1f}s") # 2. Flag bad antennas (via mnc_python) _t = time.time() flag_bad_antennas(nvme_ms) logger.info(f"[TIMER] flag_bad_antennas: {time.time() - _t:.1f}s") # 3. Apply calibration (bandpass + XY-phase) _t = time.time() cal_ok = apply_calibration(nvme_ms, bp_table, xy_table) logger.info(f"[TIMER] apply_calibration: {time.time() - _t:.1f}s") if not cal_ok: logger.error(f"Calibration failed for {os.path.basename(nvme_ms)}; removing") shutil.rmtree(nvme_ms, ignore_errors=True) raise RuntimeError(f"Calibration failed for {src_ms}") # 4. Peeling if peel_sky: _t = time.time() logger.info(f"Peeling sky model on {os.path.basename(nvme_ms)}") zest_with_ttcal( ms=nvme_ms, sources=PEELING_PARAMS['sky_model'], beam=PEELING_PARAMS['beam'], minuvw=PEELING_PARAMS['minuvw'], maxiter=PEELING_PARAMS['maxiter'], tolerance=PEELING_PARAMS['tolerance'], ) logger.info(f"[TIMER] peel_sky: {time.time() - _t:.1f}s") if peel_rfi: _t = time.time() logger.info(f"Peeling RFI model on {os.path.basename(nvme_ms)}") # RFI peeling may use a different conda env (ttcal_dev) than sky (julia060). # The orca wrapper zest_with_ttcal uses julia060 hardcoded. # If the envs are different, use shell-based invocation like the # Slurm pipeline does. rfi_env = PEELING_PARAMS.get('rfi_env', 'julia060') sky_env = PEELING_PARAMS.get('sky_env', 'julia060') if rfi_env != sky_env: # Shell-based invocation matching process_subband.py peel_env = os.environ.copy() peel_env["OMP_NUM_THREADS"] = "8" cmd = ( f"source ~/.bashrc && conda activate {rfi_env} && " f"ttcal.jl zest {nvme_ms} {PEELING_PARAMS['rfi_model']} " f"{PEELING_PARAMS['args']}" ) import subprocess subprocess.run( cmd, shell=True, check=True, executable='/bin/bash', env=peel_env, ) else: zest_with_ttcal( ms=nvme_ms, sources=PEELING_PARAMS['rfi_model'], beam=PEELING_PARAMS['beam'], minuvw=PEELING_PARAMS['minuvw'], maxiter=PEELING_PARAMS['maxiter'], tolerance=PEELING_PARAMS['tolerance'], ) logger.info(f"[TIMER] peel_rfi: {time.time() - _t:.1f}s") logger.info( f"[{self.request.id}] Phase 1 DONE: {os.path.basename(nvme_ms)}" ) logger.info(f"[TIMER] phase1_total: {time.time() - _p1_t0:.1f}s ({os.path.basename(nvme_ms)})") return nvme_ms
# ============================================================================ # PHASE 2 — Per-subband task (runs after all Phase 1 tasks complete) # ============================================================================ @app.task( bind=True, name='orca.tasks.subband_tasks.process_subband_task', acks_late=True, time_limit=28800, # 8 hours hard limit soft_time_limit=27000, # 7h30m soft limit )
[docs] def process_subband_task( self, ms_paths: List[str], work_dir: str, subband: str, lst_label: str, obs_date: str, run_label: str, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, targets: Optional[List[str]] = None, catalog: Optional[str] = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, remaining_hours: Optional[List[dict]] = None, dynamic_run_label: Optional[str] = None, bp_table: Optional[str] = None, xy_table: Optional[str] = None, ) -> str: """Phase 2: concatenate, image, run science, and archive one subband. This task is used as the callback in a ``chord``: it receives the list of NVMe MS paths returned by the Phase 1 tasks. When *remaining_hours* is provided (by ``submit_subband_pipeline_chained``), this task will submit the next hour's chord upon successful completion, ensuring hours are processed **sequentially** on the same node. When *dynamic_run_label* is set, the task operates in **dynamic dispatch** mode: instead of chaining to the next hour of the same subband, it pops the next (subband, hour) work unit from a Redis queue. This lets any free node pick up whatever work is available. Args: ms_paths: NVMe paths returned by prepare_one_ms_task (via chord). work_dir: NVMe working directory (same as Phase 1). subband: Frequency label, e.g. '73MHz'. lst_label: LST label, e.g. '14h'. obs_date: Observation date string 'YYYY-MM-DD'. run_label: Pipeline run label. hot_baselines: Whether to run hot-baseline diagnostics. skip_cleanup: If True, keep the concat MS on NVMe. cleanup_nvme: If True, remove the entire NVMe work_dir after archiving. targets: List of target-list file paths for photometry. catalog: Path to BDSF catalog for transient search masking. clean_snapshots: If True, produce CLEANed Stokes-I snapshots in ``snapshots_clean/`` in addition to the dirty pilots in ``snapshots/``. Always fpack-compressed. clean_reduced_pixels: If True, scale clean snapshot pixel count by subband frequency (1024/2048/4096). Only affects clean snapshots, not dirty pilots or science imaging. skip_science: If True, skip all science phases (dewarping, photometry, transient search, flux check) after PB correction. Products are still archived to Lustre. compress_snapshots: If True, fpack-compress all snapshot FITS to .fs and remove the originals. Deep images are not compressed. remaining_hours: List of kwarg dicts for subsequent hours. Each dict contains the arguments for ``submit_subband_pipeline``. The first entry is submitted after this hour completes, with the rest forwarded as its own ``remaining_hours``. dynamic_run_label: If set, enables dynamic dispatch mode using this run label as the Redis work-queue key. Returns: Path to the Lustre archive directory with final products. """ node = socket.gethostname() _p2_t0 = time.time() logger.info( f"[{self.request.id}] Phase 2 START on {node}: " f"{subband} ({len(ms_paths)} files)" ) os.chdir(work_dir) redirect_casa_log(work_dir) # Filter out any Nones (from failed Phase 1 tasks that were retried and # still returned nothing — shouldn't happen with autoretry, but be safe). valid_ms = [p for p in ms_paths if p and os.path.isdir(p)] # Check if a concat MS already exists from a previous (retried) attempt. # On retry, individual MS files may have been cleaned up, but the concat # MS survives — we can resume from it. existing_concat = os.path.join(work_dir, f"{subband}_concat.ms") have_concat = os.path.isdir(existing_concat) if not valid_ms and not have_concat: # No data at all — still trigger chain + cleanup before failing logger.error(f"No valid MS files for {subband} in {lst_label}") _trigger_next_and_cleanup( remaining_hours, work_dir, cleanup_nvme, subband, lst_label, dynamic_run_label=dynamic_run_label, ) raise RuntimeError(f"No valid MS files for {subband}") # Track whether processing succeeded for the summary log _phase2_failed = False archive_base = None try: for d in ['I/deep', 'V/deep', 'I/10min', 'V/10min', 'snapshots', 'QA', 'samples', 'detections', 'Dewarp_Diagnostics', 'Movies']: os.makedirs(os.path.join(work_dir, d), exist_ok=True) # ------------------------------------------------------------------ # 0. Write provenance metadata # ------------------------------------------------------------------ try: import orca as _orca provenance = { 'pipeline_version': getattr(_orca, '__git_version__', 'unknown'), 'pipeline_branch': getattr(_orca, '__git_branch__', 'unknown'), 'package_version': getattr(_orca, '__version__', 'unknown'), 'task_id': self.request.id, 'worker_node': node, 'started_at': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), 'subband': subband, 'obs_date': obs_date, 'lst_label': lst_label, 'run_label': run_label, 'dynamic_run_label': dynamic_run_label, 'bp_table': bp_table, 'xy_table': xy_table, 'n_ms_files': len(ms_paths), 'n_valid_ms': len([p for p in ms_paths if p and os.path.isdir(p)]), 'flags': { 'hot_baselines': hot_baselines, 'skip_cleanup': skip_cleanup, 'cleanup_nvme': cleanup_nvme, 'clean_snapshots': clean_snapshots, 'clean_reduced_pixels': clean_reduced_pixels, 'reduced_pixels': reduced_pixels, 'skip_science': skip_science, 'compress_snapshots': compress_snapshots, }, 'imaging': { 'pixel_size': get_pixel_size(subband) if reduced_pixels else 4096, 'clean_pixel_size': get_pixel_size(subband) if (clean_snapshots and clean_reduced_pixels) else (get_pixel_size(subband) if reduced_pixels else 4096), 'clean_pixel_scale': get_pixel_scale(subband) if (clean_snapshots and clean_reduced_pixels) else 0.03125, 'wsclean_j': get_image_resources(subband)[2], 'wsclean_bin': os.environ.get('WSCLEAN_BIN', '/opt/bin/wsclean'), 'snapshot_dirty': SNAPSHOT_PARAMS, 'snapshot_clean_i': { 'suffix': SNAPSHOT_CLEAN_I_PARAMS['suffix'], 'args': _patch_scale_arg( _patch_size_args( SNAPSHOT_CLEAN_I_PARAMS['args'], get_pixel_size(subband) if clean_reduced_pixels else 4096, ), get_pixel_scale(subband) if clean_reduced_pixels else 0.03125, ), } if clean_snapshots else None, 'science_steps': [ {'suffix': s['suffix'], 'pol': s['pol'], 'category': s['category'], 'args': s['args']} for s in IMAGING_STEPS ], }, } prov_path = os.path.join(work_dir, 'provenance.json') with open(prov_path, 'w') as f: json.dump(provenance, f, indent=2) logger.info(f"Provenance written to {prov_path}") except Exception as e: logger.warning(f"Failed to write provenance.json: {e}") # ------------------------------------------------------------------ # 1. Concatenation (skip if concat MS already exists from prior attempt) # ------------------------------------------------------------------ _t = time.time() if have_concat: concat_ms = existing_concat logger.info(f"Resuming from existing concat MS: {concat_ms}") else: logger.info("Concatenating MS files...") concat_ms = concatenate_ms(valid_ms, work_dir, subband) if not concat_ms: raise RuntimeError("Concatenation failed") # Clean up individual MS files (they are on NVMe, space is precious) if not skip_cleanup: for ms in valid_ms: if os.path.exists(ms): shutil.rmtree(ms) logger.info(f"[TIMER] concatenation: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 2. Fix FIELD_ID # ------------------------------------------------------------------ _t = time.time() fix_field_id(concat_ms) logger.info(f"[TIMER] fix_field_id: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 3. Change phase centre # ------------------------------------------------------------------ _t = time.time() hour_int = int(lst_label.replace('h', '')) phase_center = f"{hour_int:02d}h30m00s 37d12m57.057s" logger.info(f"Changing phase centre → {phase_center}") change_phase_center(concat_ms, phase_center) logger.info(f"[TIMER] chgcentre: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 4. AOFlagger # ------------------------------------------------------------------ _t = time.time() aoflagger_bin = os.environ.get('AOFLAGGER_BIN', '/opt/bin/aoflagger') logger.info(f"Running AOFlagger with strategy {AOFLAGGER_STRATEGY}") run_subprocess( [aoflagger_bin, '-strategy', AOFLAGGER_STRATEGY, concat_ms], "AOFlagger (Post-Concat)", ) logger.info(f"[TIMER] aoflagger: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 5. Pilot snapshots + QA # ------------------------------------------------------------------ _t = time.time() try: t = table(concat_ms, ack=False) times = t.getcol("TIME") n_ints = len(np.unique(times)) t.close() except Exception: n_ints = 357 pilot_name = f"{subband}-{SNAPSHOT_PARAMS['suffix']}" pilot_path = os.path.join(work_dir, "snapshots", pilot_name) wsclean_bin = os.environ.get('WSCLEAN_BIN', '/opt/bin/wsclean') _, _, wsclean_j = get_image_resources(subband) npix = get_pixel_size(subband) if reduced_pixels else 4096 logger.info(f"Pixel size for {subband}: {npix}x{npix} (reduced_pixels={reduced_pixels})") cmd_pilot = ( [wsclean_bin] + ['-j', str(wsclean_j)] + _patch_size_args(SNAPSHOT_PARAMS['args'], npix) + ['-name', pilot_path, '-intervals-out', str(n_ints), concat_ms] ) run_subprocess(cmd_pilot, "Pilot snapshot imaging") add_timestamps_to_images( os.path.join(work_dir, "snapshots"), pilot_name, concat_ms, n_ints, ) pilot_v = sorted(glob.glob( os.path.join(work_dir, "snapshots", f"{pilot_name}*-V-image*.fits") )) bad_idx, stats = analyze_snapshot_quality(pilot_v) plot_snapshot_diagnostics(stats, bad_idx, work_dir, subband) if bad_idx: flag_bad_integrations(concat_ms, bad_idx, n_ints) logger.info(f"[TIMER] pilot_snapshots_qa: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 6. Hot baseline removal (optional) # ------------------------------------------------------------------ if hot_baselines: _t = time.time() try: _run_hot_baseline_diagnostics(concat_ms, work_dir) except Exception as e: logger.error(f"Hot baseline diagnostics failed: {e}") traceback.print_exc() logger.info(f"[TIMER] hot_baselines: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 6b. CLEANed Stokes-I snapshots (optional, in addition to dirty) # ------------------------------------------------------------------ if clean_snapshots: _t = time.time() try: clean_snap_dir = os.path.join(work_dir, "snapshots_clean") os.makedirs(clean_snap_dir, exist_ok=True) # Frequency-dependent pixel scaling for clean snapshots npix_clean = get_pixel_size(subband) if clean_reduced_pixels else npix scale_clean = get_pixel_scale(subband) if clean_reduced_pixels else 0.03125 logger.info( f"Clean snapshot pixels for {subband}: {npix_clean}x{npix_clean}, " f"scale={scale_clean} deg/px " f"(clean_reduced_pixels={clean_reduced_pixels})" ) clean_name = f"{subband}-{SNAPSHOT_CLEAN_I_PARAMS['suffix']}" clean_path = os.path.join(clean_snap_dir, clean_name) _clean_args = _patch_size_args(SNAPSHOT_CLEAN_I_PARAMS['args'], npix_clean) _clean_args = _patch_scale_arg(_clean_args, scale_clean) cmd_clean_snap = ( [wsclean_bin] + ['-j', str(wsclean_j)] + _clean_args + ['-name', clean_path, '-intervals-out', str(n_ints), concat_ms] ) run_subprocess(cmd_clean_snap, "Clean Stokes-I snapshot imaging") add_timestamps_to_images( clean_snap_dir, clean_name, concat_ms, n_ints, ) # Remove PSF, model, and residual files (not needed after CLEAN) for pattern in ['*-psf.fits', '*-model.fits', '*-residual.fits']: for f in glob.glob(os.path.join(clean_snap_dir, pattern)): try: os.remove(f) except OSError: pass # Always fpack-compress clean snapshots _compress_snapshot_fits_dir(clean_snap_dir) except Exception as e: logger.error(f"Clean snapshot imaging failed: {e}") traceback.print_exc() logger.info(f"[TIMER] clean_snapshots: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 7. Science imaging + PB correction # ------------------------------------------------------------------ _t_imaging_all = time.time() logger.info(f"Starting Science Imaging for {subband}...") logger.info(f"wsclean binary: {wsclean_bin}, thread limit: -j {wsclean_j}") for step in IMAGING_STEPS: _t_step = time.time() target_dir = os.path.join(work_dir, step['pol'], step['category']) base = f"{subband}-{step['suffix']}" full_path = os.path.join(target_dir, base) cmd = [wsclean_bin] + ['-j', str(wsclean_j)] + _patch_size_args(step['args'], npix) + ['-name', full_path] if step.get('per_integration'): n_out = n_ints cmd += ['-intervals-out', str(n_ints)] elif '-intervals-out' in step['args']: idx = step['args'].index('-intervals-out') n_out = int(step['args'][idx + 1]) else: n_out = 1 cmd.append(concat_ms) run_subprocess(cmd, f"Imaging {step['suffix']}") add_timestamps_to_images(target_dir, base, concat_ms, n_out) # Apply Primary Beam Correction to all images from this step pb_count = apply_pb_correction_to_images(target_dir, base) if pb_count > 0: logger.info(f"PB corrected {pb_count} images for {step['suffix']}") logger.info(f"[TIMER] imaging_{step['suffix']}: {time.time() - _t_step:.1f}s") logger.info(f"[TIMER] imaging_all: {time.time() - _t_imaging_all:.1f}s") # ------------------------------------------------------------------ # 7a. Movie generation from pilot snapshots # ------------------------------------------------------------------ _t = time.time() try: _generate_local_movies(work_dir, subband) except Exception as e: logger.error(f"Movie generation failed: {e}") traceback.print_exc() logger.info(f"[TIMER] movie_generation: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 7b-pre. Lightweight image QA (runs ALWAYS, even with --skip_science) # ------------------------------------------------------------------ try: freq_mhz = float(subband.replace('MHz', '')) except Exception: freq_mhz = 50.0 # --- i. Per-subband noise RMS (Stokes V deep + Stokes I deep) --- _t = time.time() try: from orca.transform.post_process_science import get_inner_rms v_deep_dir = os.path.join(work_dir, "V", "deep") i_deep_dir = os.path.join(work_dir, "I", "deep") # Stokes V: raw image (not pbcorr) v_candidates = sorted(glob.glob( os.path.join(v_deep_dir, f"*V-Taper-Deep*image*.fits"))) v_candidates = [f for f in v_candidates if "pbcorr" not in f and "dewarped" not in f] v_rms = float(get_inner_rms(v_candidates[0])) if v_candidates else None # Stokes I: pbcorr preferred, raw fallback i_candidates = sorted(glob.glob( os.path.join(i_deep_dir, f"*I-Deep-Taper-Robust-0.75*pbcorr*.fits"))) i_candidates = [f for f in i_candidates if "dewarped" not in f] if not i_candidates: i_candidates = sorted(glob.glob( os.path.join(i_deep_dir, f"*I-Deep-Taper-Robust-0.75*image*.fits"))) i_candidates = [f for f in i_candidates if "pbcorr" not in f and "dewarped" not in f] i_rms = float(get_inner_rms(i_candidates[0])) if i_candidates else None # Write CSV (append-friendly: one row per subband-hour) import csv from datetime import datetime as _dt qa_csv = os.path.join(work_dir, "QA", "image_noise.csv") write_header = not os.path.exists(qa_csv) with open(qa_csv, "a", newline="") as fh: writer = csv.writer(fh) if write_header: writer.writerow([ "subband", "freq_mhz", "lst_label", "v_deep_rms", "i_deep_rms", "timestamp", ]) writer.writerow([ subband, freq_mhz, lst_label, f"{v_rms:.6e}" if v_rms else "", f"{i_rms:.6e}" if i_rms else "", _dt.utcnow().isoformat(), ]) logger.info( f"Image noise QA: V_rms={v_rms:.4e}, I_rms={i_rms:.4e}" if v_rms and i_rms else f"Image noise QA: V_rms={v_rms}, I_rms={i_rms}" ) except Exception as e: logger.warning(f"Image noise QA failed (non-fatal): {e}") logger.info(f"[TIMER] image_noise_qa: {time.time() - _t:.1f}s") # --- ii. Flux scale check (runs on PB-corrected images, no dewarping needed) --- _t = time.time() try: from orca.transform.flux_check_cutout import run_flux_check run_flux_check(work_dir, logger=logger) except ImportError as e: logger.warning(f"flux_check_cutout not available — skipping: {e}") except Exception as e: logger.warning(f"Flux check failed (non-fatal): {e}") logger.info(f"[TIMER] image_flux_check_qa: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 7b. SCIENCE PHASES (all on NVMe) # ------------------------------------------------------------------ if skip_science: logger.info("--skip_science: skipping dewarping, photometry, transients, flux check") # --- A. Ionospheric Dewarping (VLSSr cross-match) --- _t = time.time() if not skip_science: logger.info("--- Science A: Ionospheric Dewarping (VLSSr) ---") try: from orca.transform.ionospheric_dewarping import ( load_ref_catalog, generate_warp_screens, apply_warp, ) from astropy.wcs import WCS as _WCS vlssr = load_ref_catalog(VLSSR_CATALOG, "VLSSr") # Find all PB-corrected AND raw images to dewarp files_to_warp = glob.glob( os.path.join(work_dir, "*", "*", "*pbcorr*.fits")) files_to_warp = [f for f in files_to_warp if "_dewarped" not in f] raw_images = glob.glob( os.path.join(work_dir, "*", "*", "*image*.fits")) raw_images = [f for f in raw_images if "pbcorr" not in f and "_dewarped" not in f] files_to_warp.extend(raw_images) calc_img = find_deep_image(work_dir, freq_mhz, 'I') if calc_img and vlssr: df = extract_sources_to_df(calc_img) if not df.empty: with fits.open(calc_img) as h: wcs_calc = _WCS(h[0].header).celestial calc_shape = h[0].data.squeeze().shape bmaj_deg = h[0].header.get('BMAJ', 5.0 / 60.0) diag_dir = os.path.join(work_dir, "Dewarp_Diagnostics") os.makedirs(diag_dir, exist_ok=True) warp_base = os.path.join(diag_dir, f"{subband}_warp") prev_cwd = os.getcwd() os.chdir(diag_dir) try: sx, sy, _, _ = generate_warp_screens( df, vlssr, wcs_calc, calc_shape, freq_mhz, 74.0, bmaj_deg, 5.0, base_name=warp_base, ) finally: os.chdir(prev_cwd) if sx is not None: n_warped = 0 for f in files_to_warp: out = f.replace('.fits', '_dewarped.fits') if os.path.exists(out): continue try: with fits.open(f) as hf: fdata = hf[0].data.squeeze() if fdata.shape == sx.shape: warped = apply_warp(fdata, sx, sy) if warped is not None: fits.writeto( out, warped, hf[0].header, overwrite=True) n_warped += 1 except Exception: pass logger.info(f"Dewarped {n_warped}/{len(files_to_warp)} images") else: logger.warning("Warp screen generation failed — skipping dewarping.") else: logger.warning("No sources extracted for dewarping.") else: logger.warning("No deep I image or VLSSr catalog — skipping dewarping.") except ImportError as e: logger.warning(f"Dewarping modules not available — skipping: {e}") except Exception as e: logger.error(f"Dewarping failed: {e}") traceback.print_exc() logger.info(f"[TIMER] science_dewarping: {time.time() - _t:.1f}s") # --- B. Target Photometry --- _t = time.time() if not skip_science: logger.info("--- Science B: Target Photometry ---") if targets: try: from orca.transform.cutout import ( load_targets as _load_targets, process_target as _process_target, ) local_samples = os.path.join(work_dir, "samples") local_detects = os.path.join(work_dir, "detections") os.makedirs(local_samples, exist_ok=True) os.makedirs(local_detects, exist_ok=True) for t_file in targets: if not os.path.exists(t_file): logger.warning(f"Target file not found: {t_file}") continue logger.info(f"Processing target file: {t_file}") try: s_name = os.path.splitext(os.path.basename(t_file))[0] target_list = _load_targets(t_file) logger.info( f" Loaded {len(target_list)} targets from " f"{os.path.basename(t_file)}") for nm, crd, det_stokes, confusing_sources in target_list: try: _process_target( work_dir, nm, crd, s_name, local_samples, local_detects, fallback_dir=work_dir, detection_stokes=det_stokes, confusing_sources=confusing_sources, ) except Exception as e: logger.error(f" Target '{nm}' failed: {e}") except Exception as e: logger.error(f"Failed to process target file {t_file}: {e}") traceback.print_exc() except ImportError as e: logger.warning(f"cutout module not available — skipping target photometry: {e}") else: logger.info("No target files specified — skipping target photometry.") logger.info(f"[TIMER] science_target_photometry: {time.time() - _t:.1f}s") # --- B2. Solar System Body Photometry --- _t = time.time() if not skip_science: logger.info("--- Science B2: Solar System Photometry ---") try: from orca.transform.solar_system_cutout import process_solar_system local_samples = os.path.join(work_dir, "samples") local_detects = os.path.join(work_dir, "detections") os.makedirs(local_samples, exist_ok=True) os.makedirs(local_detects, exist_ok=True) process_solar_system( work_dir, local_samples, local_detects, fallback_dir=work_dir, logger=logger, ) except ImportError as e: logger.warning(f"solar_system_cutout not available — skipping: {e}") except Exception as e: logger.error(f"Solar system photometry failed: {e}") traceback.print_exc() logger.info(f"[TIMER] science_solar_system: {time.time() - _t:.1f}s") # --- C. Transient Search --- _t = time.time() if not skip_science: logger.info("--- Science C: Transient Search ---") if catalog: try: from orca.transform.transient_search import run_test as _run_test local_transient_detections = os.path.join( work_dir, "detections") os.makedirs(local_transient_detections, exist_ok=True) def _find_transient_images(pol, category, suffix_filter=None): """Find tapered, optionally dewarped, non-pbcorr images.""" pat = os.path.join( work_dir, pol, category, "*Taper*_dewarped.fits") imgs = [f for f in glob.glob(pat) if "pbcorr" not in f and "_dewarped_dewarped" not in f] if not imgs: pat = os.path.join( work_dir, pol, category, "*Taper*image*.fits") imgs = [f for f in glob.glob(pat) if "pbcorr" not in f and "dewarped" not in f] if suffix_filter: filtered = [f for f in imgs if suffix_filter in os.path.basename(f) and "NoTaper" not in os.path.basename(f)] if filtered: imgs = filtered return imgs # Deep I reference (Robust-0) for masking + subtraction ref_i_imgs = _find_transient_images( "I", "deep", suffix_filter="Robust-0-") ref_i_path = ref_i_imgs[0] if ref_i_imgs else None if ref_i_path: logger.info( f"Deep I reference: {os.path.basename(ref_i_path)}") # Stokes V: blind search (no subtraction) logger.info("Running Stokes V Blind Search...") v_deep = _find_transient_images("V", "deep") v_10min = _find_transient_images("V", "10min") v_detections = [] for v_img in v_deep + v_10min: try: result = _run_test( None, v_img, ref_i_path, catalog, output_dir=local_transient_detections) if result: v_detections.extend( result if isinstance(result, list) else [result]) except Exception as e: logger.error( f"V transient search failed on " f"{os.path.basename(v_img)}: {e}") # Stokes I: subtract deep from 10min snapshots logger.info("Running Stokes I Subtraction Search...") i_snaps = _find_transient_images("I", "10min") i_detections = [] if ref_i_path: for i_img in i_snaps: try: result = _run_test( ref_i_path, i_img, ref_i_path, catalog, output_dir=local_transient_detections) if result: i_detections.extend( result if isinstance(result, list) else [result]) except Exception as e: logger.error( f"I transient search failed on " f"{os.path.basename(i_img)}: {e}") total_det = len(v_detections) + len(i_detections) logger.info( f"Transient candidates: {len(v_detections)} Stokes V, " f"{len(i_detections)} Stokes I") if total_det > 10: logger.warning( f"QUALITY FLAG: {total_det} candidates — " f"data quality may be poor.") except ImportError as e: logger.warning( f"transient_search not available — skipping: {e}") except Exception as e: logger.error(f"Transient search failed: {e}") traceback.print_exc() else: logger.info("No catalog specified — skipping transient search.") logger.info(f"[TIMER] science_transient_search: {time.time() - _t:.1f}s") # --- D. Flux Scale Check --- # NOTE: flux check now runs unconditionally in step 7b-pre (above), # so it no longer needs the skip_science guard. The timer tag is # kept for backwards compatibility with log parsers. logger.info(f"[TIMER] science_flux_check: 0.0s") # ------------------------------------------------------------------ # 7c. Snapshot cleanup + optional compression # ------------------------------------------------------------------ _t = time.time() _cleanup_psf_files(work_dir) if compress_snapshots: _compress_snapshot_fits(work_dir) logger.info(f"[TIMER] snapshot_cleanup_compress: {time.time() - _t:.1f}s") # ------------------------------------------------------------------ # 8. Archive to Lustre # ------------------------------------------------------------------ _t = time.time() archive_base = os.path.join( LUSTRE_ARCHIVE_DIR, lst_label, obs_date, run_label, subband, ) archive_results( work_dir, archive_base, subband=subband, cleanup_concat=not skip_cleanup, cleanup_workdir=cleanup_nvme, ) logger.info(f"[TIMER] archive_to_lustre: {time.time() - _t:.1f}s") except Exception as exc: _phase2_failed = True logger.error( f"Phase 2 FAILED for {subband} {lst_label}: {exc}" ) traceback.print_exc() # Best-effort archive of whatever products exist so far try: archive_base = os.path.join( LUSTRE_ARCHIVE_DIR, lst_label, obs_date, run_label, subband, ) archive_results( work_dir, archive_base, subband=subband, cleanup_concat=not skip_cleanup, cleanup_workdir=False, ) logger.info(f"Partial archive saved to {archive_base}") except Exception as archive_exc: logger.error(f"Partial archive also failed: {archive_exc}") finally: # ------------------------------------------------------------------ # 9. Trigger next hour + cleanup — ALWAYS runs # ------------------------------------------------------------------ _trigger_next_and_cleanup( remaining_hours, work_dir, cleanup_nvme, subband, lst_label, dynamic_run_label=dynamic_run_label, ) logger.info(f"[TIMER] phase2_total: {time.time() - _p2_t0:.1f}s") if _phase2_failed: logger.error( f"[{self.request.id}] Phase 2 FAILED: {subband} {lst_label} " f"(chain continues)" ) else: logger.info( f"[{self.request.id}] Phase 2 COMPLETE: " f"{subband}{archive_base}" ) return archive_base or ''
# ============================================================================ # Convenience: submit an entire subband as a chord # ============================================================================
[docs] def submit_subband_pipeline( ms_files: List[str], subband: str, bp_table: str, xy_table: str, lst_label: str, obs_date: str, run_label: str, peel_sky: bool = False, peel_rfi: bool = False, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, nvme_work_dir: Optional[str] = None, queue_override: Optional[str] = None, targets: Optional[List[str]] = None, catalog: Optional[str] = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, remaining_hours: Optional[List[dict]] = None, dynamic_run_label: Optional[str] = None, ) -> 'celery.result.AsyncResult': """Submit the full two-phase subband pipeline as a Celery chord. This is the main entry point for the submission script. Args: ms_files: List of source MS paths on Lustre. subband: Frequency label, e.g. '73MHz'. bp_table: Bandpass calibration table. xy_table: XY-phase calibration table. lst_label: LST label, e.g. '14h'. obs_date: Observation date 'YYYY-MM-DD'. run_label: Human-readable run identifier. peel_sky: Peel astronomical sky sources. peel_rfi: Peel RFI sources. hot_baselines: Run hot-baseline diagnostics. skip_cleanup: Keep intermediate files on NVMe. cleanup_nvme: Remove entire NVMe work_dir after archiving to Lustre. nvme_work_dir: Override NVMe work directory. queue_override: Force routing to this queue instead of the default node. E.g. 'calim08' to run 18MHz on calim08. targets: List of target-list file paths for photometry. catalog: Path to BDSF catalog for transient search masking. clean_snapshots: If True, produce CLEANed Stokes-I snapshots. clean_reduced_pixels: Scale clean snapshot pixels by frequency. reduced_pixels: If True, scale pixel count by subband frequency. skip_science: If True, skip science phases after PB correction. compress_snapshots: If True, fpack-compress snapshot FITS. dynamic_run_label: If set, enables dynamic dispatch mode. Returns: Celery AsyncResult for the chord (Phase 2 result). """ queue = queue_override or get_queue_for_subband(subband) if nvme_work_dir is None: nvme_work_dir = os.path.join( NVME_BASE_DIR, lst_label, obs_date, run_label, subband, ) # Phase 1: one task per MS, all routed to the same node queue phase1_tasks = [ prepare_one_ms_task.s( src_ms=ms, nvme_work_dir=nvme_work_dir, bp_table=bp_table, xy_table=xy_table, peel_sky=peel_sky, peel_rfi=peel_rfi, ).set(queue=queue) for ms in ms_files ] # Phase 2: runs after all Phase 1 tasks complete phase2_callback = process_subband_task.s( work_dir=nvme_work_dir, subband=subband, lst_label=lst_label, obs_date=obs_date, run_label=run_label, hot_baselines=hot_baselines, skip_cleanup=skip_cleanup, cleanup_nvme=cleanup_nvme, targets=targets, catalog=catalog, clean_snapshots=clean_snapshots, clean_reduced_pixels=clean_reduced_pixels, reduced_pixels=reduced_pixels, skip_science=skip_science, compress_snapshots=compress_snapshots, remaining_hours=remaining_hours, dynamic_run_label=dynamic_run_label, bp_table=bp_table, xy_table=xy_table, ).set(queue=queue) # Error handler: if all Phase 1 retries fail the chord never fires # Phase 2, so this callback ensures the dispatch chain continues. # Attach link_error to each individual Phase 1 task (Celery 4.x # compatible — cannot pass link_error to chord() directly). error_sig = on_chord_error.si( dynamic_run_label=dynamic_run_label, work_dir=nvme_work_dir, cleanup_nvme=cleanup_nvme, subband=subband, lst_label=lst_label, ).set(queue=queue) for t in phase1_tasks: t.link_error = [error_sig] # chord(Phase1)(Phase2) — Phase2 receives list of Phase1 return values pipeline = chord(phase1_tasks)(phase2_callback) logger.info( f"Submitted {subband} → queue={queue}: " f"{len(ms_files)} MS files, work_dir={nvme_work_dir}" ) return pipeline
# ============================================================================ # Internal helpers # ============================================================================ def _run_hot_baseline_diagnostics(concat_ms: str, work_dir: str) -> None: """Run hot-baseline heatmap and UV diagnostics. Runs the hot-baseline analysis using orca.transform.hot_baselines. """ from orca.resources.subband_config import HOT_BASELINE_PARAMS try: from orca.transform import hot_baselines except ImportError: logger.warning("orca.transform.hot_baselines not importable; skipping diagnostics") return class HotArgs: ms = concat_ms col = "CORRECTED_DATA" uv_cut = 0.0 uv_cut_lambda = HOT_BASELINE_PARAMS['uv_cut_lambda'] sigma = HOT_BASELINE_PARAMS['heatmap_sigma'] uv_sigma = HOT_BASELINE_PARAMS['uv_sigma'] uv_window_size = HOT_BASELINE_PARAMS.get('uv_window_size', 100) threshold = HOT_BASELINE_PARAMS['bad_antenna_threshold'] apply_antenna_flags = HOT_BASELINE_PARAMS['apply_flags'] apply_baseline_flags = HOT_BASELINE_PARAMS['apply_flags'] run_uv = HOT_BASELINE_PARAMS['run_uv_analysis'] run_heatmap = HOT_BASELINE_PARAMS['run_heatmap_analysis'] qa_dir = os.path.join(work_dir, "QA") os.makedirs(qa_dir, exist_ok=True) cwd = os.getcwd() os.chdir(qa_dir) try: hot_baselines.run_diagnostics(HotArgs, logger) finally: os.chdir(cwd) # ============================================================================ # Sequential chaining: process multiple hours one at a time per subband # ============================================================================
[docs] def submit_subband_pipeline_chained( hour_specs: List[dict], subband: str, bp_table: str, xy_table: str, run_label: str, peel_sky: bool = False, peel_rfi: bool = False, hot_baselines: bool = False, skip_cleanup: bool = False, cleanup_nvme: bool = False, queue_override: Optional[str] = None, targets: Optional[List[str]] = None, catalog: Optional[str] = None, clean_snapshots: bool = False, clean_reduced_pixels: bool = False, reduced_pixels: bool = False, skip_science: bool = False, compress_snapshots: bool = False, ) -> 'celery.result.AsyncResult': """Submit multiple LST-hours for one subband as a sequential chain. Instead of submitting all hours simultaneously (which floods the worker with Phase 1 tasks and starves Phase 2), this function submits only the **first hour** immediately. Phase 2 of each hour triggers the next hour's chord upon completion, ensuring: - Phase 2 (imaging) runs on an idle node with full CPU/memory - NVMe space is freed before the next hour's data arrives - No resource contention between hours on the same node Different subbands still run in **parallel** on different nodes. Args: hour_specs: List of dicts, each with keys: - ``ms_files``: List of source MS paths for that hour. - ``lst_label``: e.g. '14h'. - ``obs_date``: e.g. '2025-06-15'. subband: Frequency label, e.g. '73MHz'. bp_table: Bandpass calibration table. xy_table: XY-phase calibration table. run_label: Human-readable run identifier. peel_sky: Peel astronomical sky sources. peel_rfi: Peel RFI sources. hot_baselines: Run hot-baseline diagnostics. skip_cleanup: Keep intermediate files on NVMe. cleanup_nvme: Remove entire NVMe work_dir after archiving. queue_override: Force routing to this queue. targets: Target-list file paths for photometry. catalog: BDSF catalog for transient search masking. clean_snapshots: Produce CLEANed Stokes-I snapshots. clean_reduced_pixels: Scale clean snapshot pixels by frequency. reduced_pixels: Scale pixel count by subband frequency. skip_science: Skip science phases after PB correction. compress_snapshots: fpack-compress snapshot FITS. Returns: Celery AsyncResult for the first hour's chord (only the first hour is submitted immediately; subsequent hours are triggered by Phase 2 callbacks). """ if not hour_specs: raise ValueError("hour_specs must not be empty") # Build the kwargs dict for each hour's submit_subband_pipeline() call. # Common params are the same; only ms_files/lst_label/obs_date vary. all_hour_kwargs = [] for spec in hour_specs: kwargs = dict( ms_files=spec['ms_files'], subband=subband, bp_table=bp_table, xy_table=xy_table, lst_label=spec['lst_label'], obs_date=spec['obs_date'], run_label=run_label, peel_sky=peel_sky, peel_rfi=peel_rfi, hot_baselines=hot_baselines, skip_cleanup=skip_cleanup, cleanup_nvme=cleanup_nvme, queue_override=queue_override, targets=targets, catalog=catalog, clean_snapshots=clean_snapshots, clean_reduced_pixels=clean_reduced_pixels, reduced_pixels=reduced_pixels, skip_science=skip_science, compress_snapshots=compress_snapshots, ) all_hour_kwargs.append(kwargs) # Submit only the first hour now; pass remaining hours through so # Phase 2 can trigger the next one upon completion. first_hour = all_hour_kwargs[0] remaining = all_hour_kwargs[1:] or None labels = [s['lst_label'] for s in hour_specs] logger.info( f"Chained submission for {subband}: " f"{' → '.join(labels)} ({len(hour_specs)} hours)" ) return submit_subband_pipeline(remaining_hours=remaining, **first_hour)