orca.transform.subband_processing ================================= .. py:module:: orca.transform.subband_processing .. autoapi-nested-parse:: Pure functions for subband processing steps. These functions are called by the Celery tasks in subband_tasks.py but contain NO Celery decorators — they are plain Python so they can be tested locally. Adapted to use orca wrappers where available and NVMe paths from subband_config. Attributes ---------- .. autoapisummary:: orca.transform.subband_processing.BDSF_AVAILABLE orca.transform.subband_processing.logger Functions --------- .. autoapisummary:: orca.transform.subband_processing.redirect_casa_log orca.transform.subband_processing.find_archive_files_for_subband orca.transform.subband_processing.copy_ms_to_nvme orca.transform.subband_processing.calculate_spwmap orca.transform.subband_processing.apply_calibration orca.transform.subband_processing.get_bad_antenna_numbers orca.transform.subband_processing.flag_bad_antennas orca.transform.subband_processing.concatenate_ms orca.transform.subband_processing.fix_field_id orca.transform.subband_processing.analyze_snapshot_quality orca.transform.subband_processing.flag_bad_integrations orca.transform.subband_processing.add_timestamps_to_images orca.transform.subband_processing.plot_snapshot_diagnostics orca.transform.subband_processing.run_casa_task orca.transform.subband_processing.apply_pb_correction_to_images orca.transform.subband_processing.run_subprocess orca.transform.subband_processing.find_deep_image orca.transform.subband_processing.extract_sources_to_df orca.transform.subband_processing.archive_results Module Contents --------------- .. py:data:: BDSF_AVAILABLE :value: True .. py:data:: logger .. py:function:: redirect_casa_log(work_dir: str) -> None Point the casalog to a file inside the work directory. .. py:function:: find_archive_files_for_subband(start_dt: datetime.datetime, end_dt: datetime.datetime, subband: str, input_dir: Optional[str] = None) -> List[str] Locate averaged MS files for a subband within a time range. Searches either a custom *input_dir* or the default Lustre nighttime directory tree ``/lustre/pipeline/night-time/averaged////``. :param start_dt: Start of the observation window (UTC). :param end_dt: End of the observation window (UTC). :param subband: Frequency label, e.g. ``'73MHz'``. :param input_dir: Optional override directory to search. :returns: Sorted list of absolute paths to matching ``.ms`` directories. .. py:function:: copy_ms_to_nvme(src_ms: str, nvme_work_dir: str) -> str Copy a single MS directory to the NVMe work directory. :param src_ms: Source path on Lustre. :param nvme_work_dir: Target directory on local NVMe. :returns: Path to the copied MS on NVMe. .. py:function:: calculate_spwmap(ms_path: str, caltable_path: str) -> Optional[List[int]] Compute the spectral-window mapping between an MS and a calibration table. For each SPW in the MS, finds the best-overlapping SPW in the cal table. Falls back to nearest-frequency match when there is no overlap. :param ms_path: Path to the measurement set. :param caltable_path: Path to the calibration table. :returns: List of cal-table SPW indices (one per MS SPW), or None on failure. .. py:function:: apply_calibration(ms_path: str, bp_table: str, xy_table: str) -> bool Apply bandpass + XY-phase calibration tables to an MS. Also flags channels above 85 MHz if present. :param ms_path: Path to the measurement set (modified in-place). :param bp_table: Path to bandpass calibration table. :param xy_table: Path to XY-phase calibration table. :returns: True on success, False on failure. .. py:function:: get_bad_antenna_numbers(ms_path: str) -> List[int] Query mnc_python for bad antennas at the observation time. Requires the ``development`` conda environment with mnc_python installed. :param ms_path: Path to MS; the observation time is read from the OBSERVATION table. :returns: List of bad correlator numbers (may be empty). .. py:function:: flag_bad_antennas(ms_path: str) -> str Flag bad antennas in an MS using mnc_python lookup. :param ms_path: Path to the measurement set. :returns: The ms_path (for chaining). .. py:function:: concatenate_ms(ms_files: List[str], work_dir: str, subband: str, max_retries: int = 10) -> Optional[str] Concatenate MS files with automatic pruning of corrupt files. If concat fails, the CASA log is parsed to identify the culprit MS, which is removed from the list and concat is retried. :param ms_files: List of MS paths to concatenate. :param work_dir: Working directory for the output. :param subband: Subband label (used in output filename). :param max_retries: Maximum number of retry attempts. :returns: Path to the concatenated MS, or None on total failure. .. py:function:: fix_field_id(ms_path: str) -> bool Set all FIELD_ID values to 0 in the MS. :param ms_path: Path to the measurement set. :returns: True on success. .. py:function:: analyze_snapshot_quality(image_list: List[str]) -> Tuple[List[int], List[Dict]] Analyse pilot snapshot images and flag bad integrations. Computes RMS in a central 1024×1024 box for each Stokes-V snapshot. Outliers (>3σ above median or <0.5× median) are flagged. :param image_list: Sorted list of FITS snapshot paths. :returns: Tuple of (bad_indices, stats_list). .. py:function:: flag_bad_integrations(ms_path: str, bad_indices: List[int], n_total: int) -> None Flag bad scans using SCAN_NUMBER. Each snapshot index maps 1:1 to a scan in the MS. We read the unique scan numbers, map bad_indices to scan numbers, and flag by scan. This is cleaner and more reliable than time-range flagging. :param ms_path: Concatenated MS. :param bad_indices: Snapshot indices to flag. :param n_total: Total number of integrations. .. py:function:: add_timestamps_to_images(target_dir: str, prefix: str, ms_path: str, n_intervals: int) -> bool Rename WSClean output images to include UTC timestamps. :param target_dir: Directory containing the FITS images. :param prefix: WSClean filename prefix. :param ms_path: Concatenated MS (for time column). :param n_intervals: Number of time intervals produced. :returns: True on success. .. py:function:: plot_snapshot_diagnostics(stats: List[Dict], bad_indices: List[int], work_dir: str, subband: str) -> None Save a snapshot-RMS-vs-time diagnostic plot. :param stats: List of per-integration stats dicts. :param bad_indices: Indices flagged as bad. :param work_dir: Working directory (QA subdir is used). :param subband: Subband label for title. .. py:function:: run_casa_task(work_dir: str, task_code: str) -> bool Execute a CASA task in a subprocess for crash isolation. Writes a temporary Python script and executes it, so that any CASA crash or table-locking issue does not bring down the Celery worker. :param work_dir: Working directory (logs stored here). :param task_code: Python code string (indented for embedding in the wrapper). :returns: True on success, False on failure. .. py:function:: apply_pb_correction_to_images(target_dir: str, base_prefix: str) -> int Apply primary beam correction to all image FITS files in a directory. Tries to import ``pb_correct.apply_pb_correction`` from the preliminary pipeline. If not available, logs a warning and skips. :param target_dir: Directory containing FITS images. :param base_prefix: WSClean filename prefix to match. :returns: Number of images successfully corrected. .. py:function:: run_subprocess(cmd: List[str], description: str) -> None Run a shell command and log start/finish. :param cmd: Command and arguments. :param description: Human-readable label. :raises subprocess.CalledProcessError on non-zero exit.: .. py:function:: find_deep_image(run_dir: str, freq_mhz: float, pol: str = 'I') -> Optional[str] Find the best deep tapered image for a subband. Searches ``/MHz//deep/`` and falls back to ``//deep/`` for PB-corrected tapered images. :param run_dir: Working directory (NVMe or archive). :param freq_mhz: Subband centre frequency in MHz. :param pol: Stokes parameter ('I' or 'V'). :returns: Path to the best deep image, or None. .. py:function:: extract_sources_to_df(filename: str, thresh_pix: float = 10.0) -> pandas.DataFrame Extract sources from a FITS image using PyBDSF. Runs BDSF in a **subprocess** because Celery ForkPoolWorker processes are daemonic and Python forbids daemon processes from spawning children. BDSF internally uses multiprocessing (even with ncores=1 for some code paths), so isolating it in a fresh subprocess is the only reliable fix. :param filename: Path to FITS image. :param thresh_pix: Detection threshold in pixels. :returns: DataFrame with columns ra, dec, flux_peak_I_app, maj, min. Empty DataFrame on failure. .. py:function:: archive_results(work_dir: str, archive_base: str, subband: str = '', cleanup_concat: bool = True, cleanup_workdir: bool = False) -> str Copy pipeline products from NVMe work_dir to Lustre archive. Copies subdirectories I/, V/, snapshots/, QA/, samples/, detections/, Movies/, Dewarp_Diagnostics/ and loose files. Also writes to the centralised ``samples/`` and ``detections/`` trees under ``LUSTRE_ARCHIVE_DIR`` so that products from many runs are aggregated in one place. :param work_dir: NVMe working directory. :param archive_base: Lustre destination directory. :param subband: Subband label (e.g. '73MHz') — used for centralized archive paths. :param cleanup_concat: Whether to remove concat MS on NVMe. :param cleanup_workdir: Whether to remove the entire work_dir after archiving. Supersedes cleanup_concat when True. :returns: The archive_base path.