orca.tasks.pipeline_tasks

Core pipeline processing tasks for OVRO-LWA data reduction.

Contains Celery tasks for the main data processing workflows including: - Measurement set copying and management - Calibration application and flagging - Bandpass calibration pipeline - Frequency averaging - Image generation and cleanup

These tasks form the building blocks of the automated pipeline.

Functions

copy_ms_task(→ str)

Copy the MS from its original location to slow-averaged directory.

copy_ms_nighttime_task(→ str)

Copy the MS file to the same directory with a new name.

flag_ants_task(→ str)

Flag antennas in the measurement set using the provided antenna indices

flag_with_aoflagger_task(→ str)

Apply AOFlagger on the measurement set with specified strategy and options

save_flag_metadata_task(→ str)

Save flag metadata for the measurement set and return the updated path

save_flag_metadata_nighttime_task(→ str)

Save flag metadata for a nighttime measurement set using the nighttime output directory

applycal_data_col_task(→ str)

Celery task to apply calibration to an MS.

wsclean_task(→ None)

Run wsclean imaging on the measurement set and return the original MS path

peel_with_ttcal_task(→ str)

Celery task to use TTCal to peel sources.

average_frequency_task(→ str)

Perform frequency averaging on the measurement set; returns a tuple (original_ms, averaged_ms)

average_frequency_nighttime_task(→ str)

Perform frequency averaging on a nighttime measurement set; returns (original_ms, averaged_ms)

change_phase_center_task(→ str)

Celery task to change the phase center of a calibrated and averaged MS.

extract_original_ms_task(→ str)

Extract the original MS path from the tuple (original_ms, averaged_ms)

run_entire_pipeline_on_one_cpu(→ str)

A single-task pipeline runner that combines all steps into one execution on the same node (one CPU).

copy_ms_to_nvme_task(→ str)

Copy the MS from Lustre to NVMe (fast) storage, placing it directly in /fast/pipeline/.

save_flag_metadata_nvme_task(→ str)

Save flag metadata on NVMe directly in /fast/pipeline/.

average_frequency_nvme_task(→ tuple)

Average frequency on NVMe, storing output in /fast/pipeline/.

remove_ms_task(→ str)

Remove the original measurement set directory and return the averaged MS path

run_entire_pipeline_on_one_cpu_nvme(→ str)

NVMe-based pipeline run with simplified NVMe storage logic:

copy_ms_to_calibration_task(→ str)

Copy the MS from its original slow directory to the calibration directory

get_utc_hour_from_path(→ int)

Extract the UTC hour from the measurement set path (assumes the hour is the second-to-last path component)

run_pipeline_slow_on_one_cpu_nvme(→ str)

A pipeline that:

split_2pol_task(→ str)

Celery task to split an MS down to the specified polarizations (default "XX,YY")

bandpass_nvme_task(→ str)

run_nvme_reduce_all_unconditional(→ str)

Process one MS on NVMe (no LST/transit gating), never touch/delete original.

zesting_one_ms_task(→ str)

Stage MS to NVMe, CASA applycal(bandpass_table), TTCal zest(sources_json),

Module Contents

orca.tasks.pipeline_tasks.copy_ms_task(original_ms: str, base_output_dir: str = '/lustre/pipeline/slow-averaged/') str[source]

Copy the MS from its original location to slow-averaged directory. Returns the path to the copied MS.

orca.tasks.pipeline_tasks.copy_ms_nighttime_task(original_ms: str) str[source]

Copy the MS file to the same directory with a new name. The copied file will have ‘_copy’ appended to the base name.

Example

original_ms = ‘/lustre/pipeline/night-time/73MHz/2023-11-21/03/20231121_031000_73MHz.ms’ copied_ms = ‘/lustre/pipeline/night-time/73MHz/2023-11-21/03/20231121_031000_73MHz_copy.ms’

Returns:

The path to the copied MS.

Return type:

str

orca.tasks.pipeline_tasks.flag_ants_task(ms: str, ants: List[int]) str[source]

Flag antennas in the measurement set using the provided antenna indices

orca.tasks.pipeline_tasks.flag_with_aoflagger_task(ms: str, strategy: str = '/opt/share/aoflagger/strategies/nenufar-lite.lua', in_memory: bool = False, n_threads: int = 1) str[source]

Apply AOFlagger on the measurement set with specified strategy and options

orca.tasks.pipeline_tasks.save_flag_metadata_task(ms: str) str[source]

Save flag metadata for the measurement set and return the updated path

orca.tasks.pipeline_tasks.save_flag_metadata_nighttime_task(ms: str) str[source]

Save flag metadata for a nighttime measurement set using the nighttime output directory

orca.tasks.pipeline_tasks.applycal_data_col_task(ms: str, gaintable: str) str[source]

Celery task to apply calibration to an MS.

orca.tasks.pipeline_tasks.wsclean_task(ms: str, out_dir: str, filename_prefix: str, extra_args: List[str], num_threads: int, mem_gb: int) None[source]

Run wsclean imaging on the measurement set and return the original MS path

orca.tasks.pipeline_tasks.peel_with_ttcal_task(ms: str, sources: str) str[source]

Celery task to use TTCal to peel sources.

orca.tasks.pipeline_tasks.average_frequency_task(ms: str, chanbin: int = 4) str[source]

Perform frequency averaging on the measurement set; returns a tuple (original_ms, averaged_ms)

orca.tasks.pipeline_tasks.average_frequency_nighttime_task(ms: str, chanbin: int = 4) str[source]

Perform frequency averaging on a nighttime measurement set; returns (original_ms, averaged_ms)

orca.tasks.pipeline_tasks.change_phase_center_task(ms: str, new_phase_center: str) str[source]

Celery task to change the phase center of a calibrated and averaged MS.

orca.tasks.pipeline_tasks.extract_original_ms_task(ms_tuple: tuple) str[source]

Extract the original MS path from the tuple (original_ms, averaged_ms) returned by average_frequency_task.

Parameters:

ms_tuple (tuple) – A tuple where the first element is the original MS and the second is the path to the averaged MS.

Returns:

Path to the original MS.

Return type:

str

orca.tasks.pipeline_tasks.run_entire_pipeline_on_one_cpu(vis: str, window_minutes: int = 4, start_hour: int = 11, end_hour: int = 14, chanbin: int = 4) str[source]

A single-task pipeline runner that combines all steps into one execution on the same node (one CPU). It uses the existing tasks but calls them directly in a single function.

orca.tasks.pipeline_tasks.copy_ms_to_nvme_task(original_ms: str, nvme_base_dir: str = '/fast/pipeline/') str[source]

Copy the MS from Lustre to NVMe (fast) storage, placing it directly in /fast/pipeline/.

orca.tasks.pipeline_tasks.save_flag_metadata_nvme_task(ms: str) str[source]

Save flag metadata on NVMe directly in /fast/pipeline/.

orca.tasks.pipeline_tasks.average_frequency_nvme_task(ms: str, chanbin: int = 4) tuple[source]

Average frequency on NVMe, storing output in /fast/pipeline/.

orca.tasks.pipeline_tasks.remove_ms_task(ms_tuple: tuple) str[source]

Remove the original measurement set directory and return the averaged MS path

orca.tasks.pipeline_tasks.run_entire_pipeline_on_one_cpu_nvme(vis: str, window_minutes: int = 4, start_hour: int = 11, end_hour: int = 14, chanbin: int = 4) str[source]

NVMe-based pipeline run with simplified NVMe storage logic: - Copy from Lustre to /fast/pipeline/ with no subdirectories. - Flag, save metadata, average on NVMe (all in /fast/pipeline/). - Move the final averaged MS and flag metadata back to /lustre/pipeline/night-time/averaged/

using the original vis path to determine final directory structure.

  • If scenario ii) (not in window or LST range), remove original MS from Lustre.

orca.tasks.pipeline_tasks.copy_ms_to_calibration_task(original_ms: str, calibration_base_dir: str = '/lustre/pipeline/calibration/') str[source]

Copy the MS from its original slow directory to the calibration directory preserving frequency/date/hour structure.

orca.tasks.pipeline_tasks.get_utc_hour_from_path(ms_path: str) int[source]

Extract the UTC hour from the measurement set path (assumes the hour is the second-to-last path component)

orca.tasks.pipeline_tasks.run_pipeline_slow_on_one_cpu_nvme(self, vis: str, start: int = 1, end: int = 14, chanbin: int = 4) str[source]

A pipeline that: - Checks if MS is a calibrator; if yes, copy to calibration directory without removing original. - If MS UTC hour is in [start..end], process it on NVMe: copy to NVMe, flag, save metadata, average.

After averaging, move results to /lustre/pipeline/slow-averaged/.

  • Do not remove the original MS from /lustre/pipeline/slow/.

  • Remove the NVMe copy after processing.

orca.tasks.pipeline_tasks.split_2pol_task(ms_input: str, ms_output: str = None, correlation: str = 'XX,YY', datacolumn: str = 'all', remove_original: bool = True) str[source]

Celery task to split an MS down to the specified polarizations (default “XX,YY”) using CASA’s split task. By default, removes the original MS after splitting.

Parameters:
  • ms_input – Path to the input measurement set (e.g., “/lustre/pipeline/cosmology/41MHz/2025-01-01/12/xyz.ms/”)

  • ms_output – Path to the output measurement set. If None, we auto-generate by stripping trailing slashes, removing ‘.ms’ extension, and appending ‘_2pol.ms’

  • correlation – Correlations to keep (e.g., “XX,YY”)

  • datacolumn – Data column to copy (default “all”)

  • remove_original – If True, remove the original MS after splitting

Returns:

The path to the newly created 2-pol measurement set

orca.tasks.pipeline_tasks.bandpass_nvme_task(ms_list, delay_table, obs_date, nvme_root='/fast/pipeline') str[source]
orca.tasks.pipeline_tasks.run_nvme_reduce_all_unconditional(self, vis: str, *, chanbin: int = 4, nvme_base_dir: str = '/fast/pipeline', cleanup_nvme: bool = True) str[source]

Process one MS on NVMe (no LST/transit gating), never touch/delete original. Output:

/lustre/pipeline/night-time/averaged/{subband}/{date}/{hour}/{BASE}_averaged.ms /lustre/pipeline/night-time/averaged/{subband}/{date}/{hour}/{BASE}_flagmeta.bin

orca.tasks.pipeline_tasks.zesting_one_ms_task(self, ms_path: str, subband: str, obs_date: str, hour: str, *, bandpass_table: str, sources_json: str, nvme_root: str = '/fast/pipeline/peel', dest_root: str = '/lustre/pipeline/peel_test', nvme_unique: bool = False) str[source]

Stage MS to NVMe, CASA applycal(bandpass_table), TTCal zest(sources_json), then move to <dest_root>/<sb>/<date>/<hour>. Cleans NVMe on failure. Original untouched.