orca.tasks.pipeline_tasks
Core pipeline processing tasks for OVRO-LWA data reduction.
Contains Celery tasks for the main data processing workflows including: - Measurement set copying and management - Calibration application and flagging - Bandpass calibration pipeline - Frequency averaging - Image generation and cleanup
These tasks form the building blocks of the automated pipeline.
Functions
|
Copy the MS from its original location to slow-averaged directory. |
|
Copy the MS file to the same directory with a new name. |
|
Flag antennas in the measurement set using the provided antenna indices |
|
Apply AOFlagger on the measurement set with specified strategy and options |
|
Save flag metadata for the measurement set and return the updated path |
Save flag metadata for a nighttime measurement set using the nighttime output directory |
|
|
Celery task to apply calibration to an MS. |
|
Run wsclean imaging on the measurement set and return the original MS path |
|
Celery task to use TTCal to peel sources. |
|
Perform frequency averaging on the measurement set; returns a tuple (original_ms, averaged_ms) |
Perform frequency averaging on a nighttime measurement set; returns (original_ms, averaged_ms) |
|
|
Celery task to change the phase center of a calibrated and averaged MS. |
|
Extract the original MS path from the tuple (original_ms, averaged_ms) |
A single-task pipeline runner that combines all steps into one execution on the same node (one CPU). |
|
|
Copy the MS from Lustre to NVMe (fast) storage, placing it directly in /fast/pipeline/. |
|
Save flag metadata on NVMe directly in /fast/pipeline/. |
|
Average frequency on NVMe, storing output in /fast/pipeline/. |
|
Remove the original measurement set directory and return the averaged MS path |
NVMe-based pipeline run with simplified NVMe storage logic: |
|
|
Copy the MS from its original slow directory to the calibration directory |
|
Extract the UTC hour from the measurement set path (assumes the hour is the second-to-last path component) |
A pipeline that: |
|
|
Celery task to split an MS down to the specified polarizations (default "XX,YY") |
|
|
Process one MS on NVMe (no LST/transit gating), never touch/delete original. |
|
|
Stage MS to NVMe, CASA applycal(bandpass_table), TTCal zest(sources_json), |
Module Contents
- orca.tasks.pipeline_tasks.copy_ms_task(original_ms: str, base_output_dir: str = '/lustre/pipeline/slow-averaged/') str[source]
Copy the MS from its original location to slow-averaged directory. Returns the path to the copied MS.
- orca.tasks.pipeline_tasks.copy_ms_nighttime_task(original_ms: str) str[source]
Copy the MS file to the same directory with a new name. The copied file will have ‘_copy’ appended to the base name.
Example
original_ms = ‘/lustre/pipeline/night-time/73MHz/2023-11-21/03/20231121_031000_73MHz.ms’ copied_ms = ‘/lustre/pipeline/night-time/73MHz/2023-11-21/03/20231121_031000_73MHz_copy.ms’
- Returns:
The path to the copied MS.
- Return type:
- orca.tasks.pipeline_tasks.flag_ants_task(ms: str, ants: List[int]) str[source]
Flag antennas in the measurement set using the provided antenna indices
- orca.tasks.pipeline_tasks.flag_with_aoflagger_task(ms: str, strategy: str = '/opt/share/aoflagger/strategies/nenufar-lite.lua', in_memory: bool = False, n_threads: int = 1) str[source]
Apply AOFlagger on the measurement set with specified strategy and options
- orca.tasks.pipeline_tasks.save_flag_metadata_task(ms: str) str[source]
Save flag metadata for the measurement set and return the updated path
- orca.tasks.pipeline_tasks.save_flag_metadata_nighttime_task(ms: str) str[source]
Save flag metadata for a nighttime measurement set using the nighttime output directory
- orca.tasks.pipeline_tasks.applycal_data_col_task(ms: str, gaintable: str) str[source]
Celery task to apply calibration to an MS.
- orca.tasks.pipeline_tasks.wsclean_task(ms: str, out_dir: str, filename_prefix: str, extra_args: List[str], num_threads: int, mem_gb: int) None[source]
Run wsclean imaging on the measurement set and return the original MS path
- orca.tasks.pipeline_tasks.peel_with_ttcal_task(ms: str, sources: str) str[source]
Celery task to use TTCal to peel sources.
- orca.tasks.pipeline_tasks.average_frequency_task(ms: str, chanbin: int = 4) str[source]
Perform frequency averaging on the measurement set; returns a tuple (original_ms, averaged_ms)
- orca.tasks.pipeline_tasks.average_frequency_nighttime_task(ms: str, chanbin: int = 4) str[source]
Perform frequency averaging on a nighttime measurement set; returns (original_ms, averaged_ms)
- orca.tasks.pipeline_tasks.change_phase_center_task(ms: str, new_phase_center: str) str[source]
Celery task to change the phase center of a calibrated and averaged MS.
- orca.tasks.pipeline_tasks.extract_original_ms_task(ms_tuple: tuple) str[source]
Extract the original MS path from the tuple (original_ms, averaged_ms) returned by average_frequency_task.
- orca.tasks.pipeline_tasks.run_entire_pipeline_on_one_cpu(vis: str, window_minutes: int = 4, start_hour: int = 11, end_hour: int = 14, chanbin: int = 4) str[source]
A single-task pipeline runner that combines all steps into one execution on the same node (one CPU). It uses the existing tasks but calls them directly in a single function.
- orca.tasks.pipeline_tasks.copy_ms_to_nvme_task(original_ms: str, nvme_base_dir: str = '/fast/pipeline/') str[source]
Copy the MS from Lustre to NVMe (fast) storage, placing it directly in /fast/pipeline/.
- orca.tasks.pipeline_tasks.save_flag_metadata_nvme_task(ms: str) str[source]
Save flag metadata on NVMe directly in /fast/pipeline/.
- orca.tasks.pipeline_tasks.average_frequency_nvme_task(ms: str, chanbin: int = 4) tuple[source]
Average frequency on NVMe, storing output in /fast/pipeline/.
- orca.tasks.pipeline_tasks.remove_ms_task(ms_tuple: tuple) str[source]
Remove the original measurement set directory and return the averaged MS path
- orca.tasks.pipeline_tasks.run_entire_pipeline_on_one_cpu_nvme(vis: str, window_minutes: int = 4, start_hour: int = 11, end_hour: int = 14, chanbin: int = 4) str[source]
NVMe-based pipeline run with simplified NVMe storage logic: - Copy from Lustre to /fast/pipeline/ with no subdirectories. - Flag, save metadata, average on NVMe (all in /fast/pipeline/). - Move the final averaged MS and flag metadata back to /lustre/pipeline/night-time/averaged/
using the original vis path to determine final directory structure.
If scenario ii) (not in window or LST range), remove original MS from Lustre.
- orca.tasks.pipeline_tasks.copy_ms_to_calibration_task(original_ms: str, calibration_base_dir: str = '/lustre/pipeline/calibration/') str[source]
Copy the MS from its original slow directory to the calibration directory preserving frequency/date/hour structure.
- orca.tasks.pipeline_tasks.get_utc_hour_from_path(ms_path: str) int[source]
Extract the UTC hour from the measurement set path (assumes the hour is the second-to-last path component)
- orca.tasks.pipeline_tasks.run_pipeline_slow_on_one_cpu_nvme(self, vis: str, start: int = 1, end: int = 14, chanbin: int = 4) str[source]
A pipeline that: - Checks if MS is a calibrator; if yes, copy to calibration directory without removing original. - If MS UTC hour is in [start..end], process it on NVMe: copy to NVMe, flag, save metadata, average.
After averaging, move results to /lustre/pipeline/slow-averaged/.
Do not remove the original MS from /lustre/pipeline/slow/.
Remove the NVMe copy after processing.
- orca.tasks.pipeline_tasks.split_2pol_task(ms_input: str, ms_output: str = None, correlation: str = 'XX,YY', datacolumn: str = 'all', remove_original: bool = True) str[source]
Celery task to split an MS down to the specified polarizations (default “XX,YY”) using CASA’s split task. By default, removes the original MS after splitting.
- Parameters:
ms_input – Path to the input measurement set (e.g., “/lustre/pipeline/cosmology/41MHz/2025-01-01/12/xyz.ms/”)
ms_output – Path to the output measurement set. If None, we auto-generate by stripping trailing slashes, removing ‘.ms’ extension, and appending ‘_2pol.ms’
correlation – Correlations to keep (e.g., “XX,YY”)
datacolumn – Data column to copy (default “all”)
remove_original – If True, remove the original MS after splitting
- Returns:
The path to the newly created 2-pol measurement set
- orca.tasks.pipeline_tasks.bandpass_nvme_task(ms_list, delay_table, obs_date, nvme_root='/fast/pipeline') str[source]
- orca.tasks.pipeline_tasks.run_nvme_reduce_all_unconditional(self, vis: str, *, chanbin: int = 4, nvme_base_dir: str = '/fast/pipeline', cleanup_nvme: bool = True) str[source]
Process one MS on NVMe (no LST/transit gating), never touch/delete original. Output:
/lustre/pipeline/night-time/averaged/{subband}/{date}/{hour}/{BASE}_averaged.ms /lustre/pipeline/night-time/averaged/{subband}/{date}/{hour}/{BASE}_flagmeta.bin
- orca.tasks.pipeline_tasks.zesting_one_ms_task(self, ms_path: str, subband: str, obs_date: str, hour: str, *, bandpass_table: str, sources_json: str, nvme_root: str = '/fast/pipeline/peel', dest_root: str = '/lustre/pipeline/peel_test', nvme_unique: bool = False) str[source]
Stage MS to NVMe, CASA applycal(bandpass_table), TTCal zest(sources_json), then move to <dest_root>/<sb>/<date>/<hour>. Cleans NVMe on failure. Original untouched.