Source code for orca.calibration.delay_pipeline

"""Delay calibration pipeline.

This module provides the delay calibration pipeline for OVRO-LWA data.
Combines measurement sets across all frequencies to solve for per-antenna
delays using Cygnus A as the calibrator.

The pipeline:
    1. Selects optimal MS per frequency based on LST proximity to Cyg A transit.
    2. Concatenates and combines spectral windows with mstransform.
    3. Flags RFI and bad antennas.
    4. Generates Cyg A model and solves for delays.
    5. Produces QA plots showing delay solutions per antenna.

Example:
    >>> from orca.calibration.delay_pipeline import run_delay_pipeline
    >>> run_delay_pipeline('2024-01-15')
"""
import argparse, os, shutil, glob, logging
from datetime import timedelta
import re
from datetime import datetime
import numpy as np
import astropy.units as u
from astropy.time import Time
from casatasks import concat, clearcal, ft, gaincal, mstransform
from casatools import msmetadata
from orca.utils.calibratormodel import model_generation
from orca.transform.flagging import flag_with_aoflagger, flag_ants
from orca.utils.flagutils import get_bad_correlator_numbers
from orca.utils.msfix import concat_issue_fieldid
from orca.utils.calibrationutils import get_lst_from_filename
from orca.transform.qa_plotting import plot_delay_vs_antenna
from orca.utils.paths import get_aoflagger_strategy






[docs] def closest_ms_by_lst(ms_list, target_lst_rad): """Find the measurement set closest to target LST. Args: ms_list: List of measurement set paths. target_lst_rad: Target LST in radians. Returns: Path to the MS with LST closest to target. """ best_ms, best_diff = None, float("inf") for vis in ms_list: try: lst = get_lst_from_filename(vis) lst_rad = lst.to(u.rad).value diff = abs((lst_rad - target_lst_rad + np.pi) % (2 * np.pi) - np.pi) # angular diff if diff < best_diff: best_ms, best_diff = vis, diff except Exception as e: print(f"Warning: failed to parse LST from {vis}: {e}") continue return best_ms
[docs] def copytree(src, dst): """Copy directory tree, removing destination if it exists. Args: src: Source directory path. dst: Destination directory path. """ if os.path.exists(dst): shutil.rmtree(dst) shutil.copytree(src, dst)
[docs] def run_delay_pipeline(obs_date, ref_lst=20.00554072/24*2*3.14159265359, tol_min=1): """ Run delay calibration for all frequencies >= 41 MHz for a given date. This function: 1. Selects the best MS for each frequency based on proximity to a reference LST. 2. Copies selected MS files to NVMe. 3. Concatenates and combines SPWs using mstransform. 4. Flags RFI using AOFlagger and known bad antennas. 5. Generates a model (Cyg A only) and applies it. 6. Runs gaincal to solve for delays. 7. Outputs: - CASA delay calibration table at /lustre/pipeline/calibration/delay/<DATE>/ - A QA PDF showing per-antenna delays. 8. Cleans up NVMe temporary files. Args: obs_date (str): Date in 'YYYY-MM-DD' format. ref_lst (float): Reference LST (in radians). Default is Cyg A transit. tol_min (int): Reserved for future LST filtering tolerance (minutes). """ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") base = f"/lustre/pipeline/calibration" out_delay_dir = f"{base}/delay/{obs_date}" os.makedirs(out_delay_dir, exist_ok=True) # 1. gather candidate MSes freqs = [d for d in os.listdir(base) if d.endswith("MHz") and int(d.rstrip("MHz")) >= 41] logging.info(f"Processing frequencies: {', '.join(freqs)}") picked = [] for f in freqs: day_dir = os.path.join(base, f, obs_date) if not os.path.isdir(day_dir): logging.warning(f"no {day_dir}; skipping") continue hour_dirs = sorted(glob.glob(os.path.join(day_dir, "*"))) ms_per_hour = [glob.glob(os.path.join(h, "*.ms")) for h in hour_dirs] ms_flat = [m for sub in ms_per_hour for m in sub] if not ms_flat: continue best = closest_ms_by_lst(ms_flat, ref_lst) picked.append(best) logging.info(f"{f}: picked {os.path.basename(best)}") # Convert tolerance (in minutes) to radians tol_rad = (tol_min / 60) * (2 * np.pi / 24) filtered = [] for vis in picked: try: lst = get_lst_from_filename(vis).to(u.rad).value diff = abs((lst - ref_lst + np.pi) % (2 * np.pi) - np.pi) if diff <= tol_rad: filtered.append(vis) else: logging.warning(f"{os.path.basename(vis)} excluded — LST diff {diff * 24 / (2 * np.pi):.3f} min exceeds tolerance of {tol_min} min") except Exception as e: logging.warning(f"Failed to get LST for {vis}: {e}") picked = filtered if not picked: raise RuntimeError("No MSes matched the requested date/LST") # 2. copy to NVMe scratch = f"/fast/pipeline/{obs_date}/delay_tmp" os.makedirs(scratch, exist_ok=True) local_ms = [] for vis in picked: dst = os.path.join(scratch, os.path.basename(vis)) copytree(vis, dst) local_ms.append(dst) # Extract datetime from first MS filename basename = os.path.basename(local_ms[0]) match = re.search(r"(\d{8}_\d{6})", basename) utc_str = match.group(1) if match else None utc_time = datetime.strptime(utc_str, "%Y%m%d_%H%M%S").strftime("%Y-%m-%d %H:%M:%S") # 3. concat + mstransform concat_ms = os.path.join(scratch, "delay_concat.ms") concat(vis=local_ms, concatvis=concat_ms, timesort=True) logging.info(f"Concatenated {len(local_ms)} MSes into {concat_ms}") # combine SPWs so delay solve sees a wide band concat_tf = os.path.join(scratch, "delay_concat_tf.ms") mstransform(vis=concat_ms, outputvis=concat_tf, datacolumn='data', combinespws=True) shutil.rmtree(concat_ms) logging.info(f"Transformed to TF: {concat_tf}") # 4. flagging bad_corr_nums = get_bad_correlator_numbers(utc_time) strategy_path = get_aoflagger_strategy("LWA_opt_GH1.lua") logging.info(f"Bad corrs: {bad_corr_nums}") logging.info(f"Using AOFlagger strategy: {strategy_path}") flag_with_aoflagger(concat_tf, strategy=strategy_path) flag_ants(concat_tf, bad_corr_nums) logging.info(f"Flagged {concat_tf} with AOFlagger and bad antennas") # 5. model (Cyg A only) + gaincal ms_model = model_generation(concat_tf) ms_model.primary_beam_model = "/lustre/ai/beam_testing/OVRO-LWA_soil_pt.h5" cl_path, _ = ms_model.gen_model_cl(included_sources=["CygA"]) logging.info(f"Generated model component list: {cl_path}") clearcal(vis=concat_tf, addmodel=True) ft(vis=concat_tf, complist=cl_path, usescratch=True) logging.info(f"Applied model to {concat_tf}") delay_tab = os.path.join(out_delay_dir, f"{obs_date.replace('-','')}_delay.delay") gaincal(vis=concat_tf, caltable=delay_tab, uvrange='>10lambda,<125lambda', solint='inf', refant='202', minblperant=4, minsnr=3.0, gaintype='K', calmode='ap', normtype='mean', solnorm=False, parang=False) logging.info(f"Delay table written to {delay_tab}") # 6. generate QA plot and clean NVMe scratch qa_pdf = delay_tab.replace(".delay", "_vs_antenna.pdf") plot_delay_vs_antenna(delay_tab, output_pdf=qa_pdf) logging.info(f"Saved QA plot to {qa_pdf}") # Remove scratch dir shutil.rmtree(scratch) parent_dir = os.path.dirname(scratch) if not os.listdir(parent_dir): shutil.rmtree(parent_dir) logging.info(f"Removed empty parent directory: {parent_dir}") return delay_tab