orca.tasks.pipeline_tasks ========================= .. py:module:: orca.tasks.pipeline_tasks .. autoapi-nested-parse:: Core pipeline processing tasks for OVRO-LWA data reduction. Contains Celery tasks for the main data processing workflows including: - Measurement set copying and management - Calibration application and flagging - Bandpass calibration pipeline - Frequency averaging - Image generation and cleanup These tasks form the building blocks of the automated pipeline. Functions --------- .. autoapisummary:: orca.tasks.pipeline_tasks.copy_ms_task orca.tasks.pipeline_tasks.copy_ms_nighttime_task orca.tasks.pipeline_tasks.flag_ants_task orca.tasks.pipeline_tasks.flag_with_aoflagger_task orca.tasks.pipeline_tasks.save_flag_metadata_task orca.tasks.pipeline_tasks.save_flag_metadata_nighttime_task orca.tasks.pipeline_tasks.applycal_data_col_task orca.tasks.pipeline_tasks.wsclean_task orca.tasks.pipeline_tasks.peel_with_ttcal_task orca.tasks.pipeline_tasks.average_frequency_task orca.tasks.pipeline_tasks.average_frequency_nighttime_task orca.tasks.pipeline_tasks.change_phase_center_task orca.tasks.pipeline_tasks.extract_original_ms_task orca.tasks.pipeline_tasks.run_entire_pipeline_on_one_cpu orca.tasks.pipeline_tasks.copy_ms_to_nvme_task orca.tasks.pipeline_tasks.save_flag_metadata_nvme_task orca.tasks.pipeline_tasks.average_frequency_nvme_task orca.tasks.pipeline_tasks.remove_ms_task orca.tasks.pipeline_tasks.run_entire_pipeline_on_one_cpu_nvme orca.tasks.pipeline_tasks.copy_ms_to_calibration_task orca.tasks.pipeline_tasks.get_utc_hour_from_path orca.tasks.pipeline_tasks.run_pipeline_slow_on_one_cpu_nvme orca.tasks.pipeline_tasks.split_2pol_task orca.tasks.pipeline_tasks.bandpass_nvme_task orca.tasks.pipeline_tasks.run_nvme_reduce_all_unconditional orca.tasks.pipeline_tasks.zesting_one_ms_task Module Contents --------------- .. py:function:: copy_ms_task(original_ms: str, base_output_dir: str = '/lustre/pipeline/slow-averaged/') -> str Copy the MS from its original location to slow-averaged directory. Returns the path to the copied MS. .. py:function:: copy_ms_nighttime_task(original_ms: str) -> str Copy the MS file to the same directory with a new name. The copied file will have '_copy' appended to the base name. .. admonition:: Example original_ms = '/lustre/pipeline/night-time/73MHz/2023-11-21/03/20231121_031000_73MHz.ms' copied_ms = '/lustre/pipeline/night-time/73MHz/2023-11-21/03/20231121_031000_73MHz_copy.ms' :returns: The path to the copied MS. :rtype: str .. py:function:: flag_ants_task(ms: str, ants: List[int]) -> str Flag antennas in the measurement set using the provided antenna indices .. py:function:: flag_with_aoflagger_task(ms: str, strategy: str = '/opt/share/aoflagger/strategies/nenufar-lite.lua', in_memory: bool = False, n_threads: int = 1) -> str Apply AOFlagger on the measurement set with specified strategy and options .. py:function:: save_flag_metadata_task(ms: str) -> str Save flag metadata for the measurement set and return the updated path .. py:function:: save_flag_metadata_nighttime_task(ms: str) -> str Save flag metadata for a nighttime measurement set using the nighttime output directory .. py:function:: applycal_data_col_task(ms: str, gaintable: str) -> str Celery task to apply calibration to an MS. .. py:function:: wsclean_task(ms: str, out_dir: str, filename_prefix: str, extra_args: List[str], num_threads: int, mem_gb: int) -> None Run wsclean imaging on the measurement set and return the original MS path .. py:function:: peel_with_ttcal_task(ms: str, sources: str) -> str Celery task to use TTCal to peel sources. .. py:function:: average_frequency_task(ms: str, chanbin: int = 4) -> str Perform frequency averaging on the measurement set; returns a tuple (original_ms, averaged_ms) .. py:function:: average_frequency_nighttime_task(ms: str, chanbin: int = 4) -> str Perform frequency averaging on a nighttime measurement set; returns (original_ms, averaged_ms) .. py:function:: change_phase_center_task(ms: str, new_phase_center: str) -> str Celery task to change the phase center of a calibrated and averaged MS. .. py:function:: extract_original_ms_task(ms_tuple: tuple) -> str Extract the original MS path from the tuple (original_ms, averaged_ms) returned by `average_frequency_task`. :param ms_tuple: A tuple where the first element is the original MS and the second is the path to the averaged MS. :type ms_tuple: tuple :returns: Path to the original MS. :rtype: str .. py:function:: run_entire_pipeline_on_one_cpu(vis: str, window_minutes: int = 4, start_hour: int = 11, end_hour: int = 14, chanbin: int = 4) -> str A single-task pipeline runner that combines all steps into one execution on the same node (one CPU). It uses the existing tasks but calls them directly in a single function. .. py:function:: copy_ms_to_nvme_task(original_ms: str, nvme_base_dir: str = '/fast/pipeline/') -> str Copy the MS from Lustre to NVMe (fast) storage, placing it directly in /fast/pipeline/. .. py:function:: save_flag_metadata_nvme_task(ms: str) -> str Save flag metadata on NVMe directly in /fast/pipeline/. .. py:function:: average_frequency_nvme_task(ms: str, chanbin: int = 4) -> tuple Average frequency on NVMe, storing output in /fast/pipeline/. .. py:function:: remove_ms_task(ms_tuple: tuple) -> str Remove the original measurement set directory and return the averaged MS path .. py:function:: run_entire_pipeline_on_one_cpu_nvme(vis: str, window_minutes: int = 4, start_hour: int = 11, end_hour: int = 14, chanbin: int = 4) -> str NVMe-based pipeline run with simplified NVMe storage logic: - Copy from Lustre to /fast/pipeline/ with no subdirectories. - Flag, save metadata, average on NVMe (all in /fast/pipeline/). - Move the final averaged MS and flag metadata back to /lustre/pipeline/night-time/averaged/ using the original vis path to determine final directory structure. - If scenario ii) (not in window or LST range), remove original MS from Lustre. .. py:function:: copy_ms_to_calibration_task(original_ms: str, calibration_base_dir: str = '/lustre/pipeline/calibration/') -> str Copy the MS from its original slow directory to the calibration directory preserving frequency/date/hour structure. .. py:function:: get_utc_hour_from_path(ms_path: str) -> int Extract the UTC hour from the measurement set path (assumes the hour is the second-to-last path component) .. py:function:: run_pipeline_slow_on_one_cpu_nvme(self, vis: str, start: int = 1, end: int = 14, chanbin: int = 4) -> str A pipeline that: - Checks if MS is a calibrator; if yes, copy to calibration directory without removing original. - If MS UTC hour is in [start..end], process it on NVMe: copy to NVMe, flag, save metadata, average. After averaging, move results to /lustre/pipeline/slow-averaged/. - Do not remove the original MS from /lustre/pipeline/slow/. - Remove the NVMe copy after processing. .. py:function:: split_2pol_task(ms_input: str, ms_output: str = None, correlation: str = 'XX,YY', datacolumn: str = 'all', remove_original: bool = True) -> str Celery task to split an MS down to the specified polarizations (default "XX,YY") using CASA's split task. By default, removes the original MS after splitting. :param ms_input: Path to the input measurement set (e.g., "/lustre/pipeline/cosmology/41MHz/2025-01-01/12/xyz.ms/") :param ms_output: Path to the output measurement set. If None, we auto-generate by stripping trailing slashes, removing '.ms' extension, and appending '_2pol.ms' :param correlation: Correlations to keep (e.g., "XX,YY") :param datacolumn: Data column to copy (default "all") :param remove_original: If True, remove the original MS after splitting :return: The path to the newly created 2-pol measurement set .. py:function:: bandpass_nvme_task(ms_list, delay_table, obs_date, nvme_root='/fast/pipeline') -> str .. py:function:: run_nvme_reduce_all_unconditional(self, vis: str, *, chanbin: int = 4, nvme_base_dir: str = '/fast/pipeline', cleanup_nvme: bool = True) -> str Process one MS on NVMe (no LST/transit gating), never touch/delete original. Output: /lustre/pipeline/night-time/averaged/{subband}/{date}/{hour}/{BASE}_averaged.ms /lustre/pipeline/night-time/averaged/{subband}/{date}/{hour}/{BASE}_flagmeta.bin .. py:function:: zesting_one_ms_task(self, ms_path: str, subband: str, obs_date: str, hour: str, *, bandpass_table: str, sources_json: str, nvme_root: str = '/fast/pipeline/peel', dest_root: str = '/lustre/pipeline/peel_test', nvme_unique: bool = False) -> str Stage MS to NVMe, CASA applycal(bandpass_table), TTCal zest(sources_json), then move to ///. Cleans NVMe on failure. Original untouched.