orca.transform.subband_processing
Pure functions for subband processing steps.
These functions are called by the Celery tasks in subband_tasks.py but contain NO Celery decorators — they are plain Python so they can be tested locally.
Adapted to use orca wrappers where available and NVMe paths from subband_config.
Attributes
Functions
|
Point the casalog to a file inside the work directory. |
|
Locate averaged MS files for a subband within a time range. |
|
Copy a single MS directory to the NVMe work directory. |
|
Compute the spectral-window mapping between an MS and a calibration table. |
|
Apply bandpass + XY-phase calibration tables to an MS. |
|
Query mnc_python for bad antennas at the observation time. |
|
Flag bad antennas in an MS using mnc_python lookup. |
|
Concatenate MS files with automatic pruning of corrupt files. |
|
Set all FIELD_ID values to 0 in the MS. |
|
Analyse pilot snapshot images and flag bad integrations. |
|
Flag bad scans using SCAN_NUMBER. |
|
Rename WSClean output images to include UTC timestamps. |
|
Save a snapshot-RMS-vs-time diagnostic plot. |
|
Execute a CASA task in a subprocess for crash isolation. |
Apply primary beam correction to all image FITS files in a directory. |
|
|
Run a shell command and log start/finish. |
|
Find the best deep tapered image for a subband. |
|
Extract sources from a FITS image using PyBDSF. |
|
Copy pipeline products from NVMe work_dir to Lustre archive. |
Module Contents
- orca.transform.subband_processing.redirect_casa_log(work_dir: str) None[source]
Point the casalog to a file inside the work directory.
- orca.transform.subband_processing.find_archive_files_for_subband(start_dt: datetime.datetime, end_dt: datetime.datetime, subband: str, input_dir: str | None = None) List[str][source]
Locate averaged MS files for a subband within a time range.
Searches either a custom input_dir or the default Lustre nighttime directory tree
/lustre/pipeline/night-time/averaged/<subband>/<date>/<hour>/.- Parameters:
start_dt – Start of the observation window (UTC).
end_dt – End of the observation window (UTC).
subband – Frequency label, e.g.
'73MHz'.input_dir – Optional override directory to search.
- Returns:
Sorted list of absolute paths to matching
.msdirectories.
- orca.transform.subband_processing.copy_ms_to_nvme(src_ms: str, nvme_work_dir: str) str[source]
Copy a single MS directory to the NVMe work directory.
- Parameters:
src_ms – Source path on Lustre.
nvme_work_dir – Target directory on local NVMe.
- Returns:
Path to the copied MS on NVMe.
- orca.transform.subband_processing.calculate_spwmap(ms_path: str, caltable_path: str) List[int] | None[source]
Compute the spectral-window mapping between an MS and a calibration table.
For each SPW in the MS, finds the best-overlapping SPW in the cal table. Falls back to nearest-frequency match when there is no overlap.
- Parameters:
ms_path – Path to the measurement set.
caltable_path – Path to the calibration table.
- Returns:
List of cal-table SPW indices (one per MS SPW), or None on failure.
- orca.transform.subband_processing.apply_calibration(ms_path: str, bp_table: str, xy_table: str) bool[source]
Apply bandpass + XY-phase calibration tables to an MS.
Also flags channels above 85 MHz if present.
- Parameters:
ms_path – Path to the measurement set (modified in-place).
bp_table – Path to bandpass calibration table.
xy_table – Path to XY-phase calibration table.
- Returns:
True on success, False on failure.
- orca.transform.subband_processing.get_bad_antenna_numbers(ms_path: str) List[int][source]
Query mnc_python for bad antennas at the observation time.
Requires the
developmentconda environment with mnc_python installed.- Parameters:
ms_path – Path to MS; the observation time is read from the OBSERVATION table.
- Returns:
List of bad correlator numbers (may be empty).
- orca.transform.subband_processing.flag_bad_antennas(ms_path: str) str[source]
Flag bad antennas in an MS using mnc_python lookup.
- Parameters:
ms_path – Path to the measurement set.
- Returns:
The ms_path (for chaining).
- orca.transform.subband_processing.concatenate_ms(ms_files: List[str], work_dir: str, subband: str, max_retries: int = 10) str | None[source]
Concatenate MS files with automatic pruning of corrupt files.
If concat fails, the CASA log is parsed to identify the culprit MS, which is removed from the list and concat is retried.
- Parameters:
ms_files – List of MS paths to concatenate.
work_dir – Working directory for the output.
subband – Subband label (used in output filename).
max_retries – Maximum number of retry attempts.
- Returns:
Path to the concatenated MS, or None on total failure.
- orca.transform.subband_processing.fix_field_id(ms_path: str) bool[source]
Set all FIELD_ID values to 0 in the MS.
- Parameters:
ms_path – Path to the measurement set.
- Returns:
True on success.
- orca.transform.subband_processing.analyze_snapshot_quality(image_list: List[str]) Tuple[List[int], List[Dict]][source]
Analyse pilot snapshot images and flag bad integrations.
Computes RMS in a central 1024×1024 box for each Stokes-V snapshot. Outliers (>3σ above median or <0.5× median) are flagged.
- Parameters:
image_list – Sorted list of FITS snapshot paths.
- Returns:
Tuple of (bad_indices, stats_list).
- orca.transform.subband_processing.flag_bad_integrations(ms_path: str, bad_indices: List[int], n_total: int) None[source]
Flag bad scans using SCAN_NUMBER.
Each snapshot index maps 1:1 to a scan in the MS. We read the unique scan numbers, map bad_indices to scan numbers, and flag by scan. This is cleaner and more reliable than time-range flagging.
- Parameters:
ms_path – Concatenated MS.
bad_indices – Snapshot indices to flag.
n_total – Total number of integrations.
- orca.transform.subband_processing.add_timestamps_to_images(target_dir: str, prefix: str, ms_path: str, n_intervals: int) bool[source]
Rename WSClean output images to include UTC timestamps.
- Parameters:
target_dir – Directory containing the FITS images.
prefix – WSClean filename prefix.
ms_path – Concatenated MS (for time column).
n_intervals – Number of time intervals produced.
- Returns:
True on success.
- orca.transform.subband_processing.plot_snapshot_diagnostics(stats: List[Dict], bad_indices: List[int], work_dir: str, subband: str) None[source]
Save a snapshot-RMS-vs-time diagnostic plot.
- Parameters:
stats – List of per-integration stats dicts.
bad_indices – Indices flagged as bad.
work_dir – Working directory (QA subdir is used).
subband – Subband label for title.
- orca.transform.subband_processing.run_casa_task(work_dir: str, task_code: str) bool[source]
Execute a CASA task in a subprocess for crash isolation.
Writes a temporary Python script and executes it, so that any CASA crash or table-locking issue does not bring down the Celery worker.
- Parameters:
work_dir – Working directory (logs stored here).
task_code – Python code string (indented for embedding in the wrapper).
- Returns:
True on success, False on failure.
- orca.transform.subband_processing.apply_pb_correction_to_images(target_dir: str, base_prefix: str) int[source]
Apply primary beam correction to all image FITS files in a directory.
Tries to import
pb_correct.apply_pb_correctionfrom the preliminary pipeline. If not available, logs a warning and skips.- Parameters:
target_dir – Directory containing FITS images.
base_prefix – WSClean filename prefix to match.
- Returns:
Number of images successfully corrected.
- orca.transform.subband_processing.run_subprocess(cmd: List[str], description: str) None[source]
Run a shell command and log start/finish.
- Parameters:
cmd – Command and arguments.
description – Human-readable label.
- Raises:
subprocess.CalledProcessError on non-zero exit. –
- orca.transform.subband_processing.find_deep_image(run_dir: str, freq_mhz: float, pol: str = 'I') str | None[source]
Find the best deep tapered image for a subband.
Searches
<run_dir>/<freq>MHz/<pol>/deep/and falls back to<run_dir>/<pol>/deep/for PB-corrected tapered images.- Parameters:
run_dir – Working directory (NVMe or archive).
freq_mhz – Subband centre frequency in MHz.
pol – Stokes parameter (‘I’ or ‘V’).
- Returns:
Path to the best deep image, or None.
- orca.transform.subband_processing.extract_sources_to_df(filename: str, thresh_pix: float = 10.0) pandas.DataFrame[source]
Extract sources from a FITS image using PyBDSF.
Runs BDSF in a subprocess because Celery ForkPoolWorker processes are daemonic and Python forbids daemon processes from spawning children. BDSF internally uses multiprocessing (even with ncores=1 for some code paths), so isolating it in a fresh subprocess is the only reliable fix.
- Parameters:
filename – Path to FITS image.
thresh_pix – Detection threshold in pixels.
- Returns:
DataFrame with columns ra, dec, flux_peak_I_app, maj, min. Empty DataFrame on failure.
- orca.transform.subband_processing.archive_results(work_dir: str, archive_base: str, subband: str = '', cleanup_concat: bool = True, cleanup_workdir: bool = False) str[source]
Copy pipeline products from NVMe work_dir to Lustre archive.
Copies subdirectories I/, V/, snapshots/, QA/, samples/, detections/, Movies/, Dewarp_Diagnostics/ and loose files. Also writes to the centralised
samples/anddetections/trees underLUSTRE_ARCHIVE_DIRso that products from many runs are aggregated in one place.- Parameters:
work_dir – NVMe working directory.
archive_base – Lustre destination directory.
subband – Subband label (e.g. ‘73MHz’) — used for centralized archive paths.
cleanup_concat – Whether to remove concat MS on NVMe.
cleanup_workdir – Whether to remove the entire work_dir after archiving. Supersedes cleanup_concat when True.
- Returns:
The archive_base path.