orca.metadata.pathsmanagers
Path management utilities for OVRO-LWA data access.
This module provides PathsManager classes that handle file path resolution for measurement sets, calibration tables, and output directories. Supports both offline batch processing and real-time pipeline modes.
- Classes:
PathsManager: Abstract base class defining the path management interface. OfflinePathsManager: Path manager for offline transient processing.
Attributes
Classes
Base PathsManager class. |
|
PathsManager for offline transient processing. |
Module Contents
- class orca.metadata.pathsmanagers.PathsManager[source]
Bases:
abc.ABCBase PathsManager class. It contains functionality to manipulate datetime objects and find dada files. Maybe in the future it will evolve into an interface with abstract methods.
- utc_times_mapping
An ordered dictionary mapping datetime objects to dada files.
- abstractmethod time_filter(start_time: datetime.datetime, end_time: datetime.datetime) PathsManager[source]
- class orca.metadata.pathsmanagers.OfflinePathsManager(utc_times_txt_path: str, dadafile_dir: str | None = None, working_dir: str | None = None, gaintable_dir: str = None, flag_npy_paths: str | Dict[datetime.datetime, str] | None = None)[source]
Bases:
PathsManagerPathsManager for offline transient processing.
This could potentially work for processing the buffer too. A config file reader will probably parse a config file into this object.
Assumes that the bandpass calibration table is named like bcal_dir/00.bcal’
- get_bcal_path(bandpass_date: datetime.date, spw: str) str[source]
Return bandpass calibration path in /gaintable/path/2018-03-02/00.bcal.
- Parameters:
bandpass_date – Date of the bandpass solution requested.
spw – Spectral window
- Returns:
Bandpass calibration path.
- get_gaintable_path(gaintable_date: datetime.date, spw: str, extension: str) str[source]
Get the path to a certain CASA gaintable.
- Parameters:
gaintable_date – date of the table requested
spw – spw of the gaintable requested
extension – extension of the gaintable (bcal etc)
- Returns:
The path to the requested gaintable.
- get_ms_path(timestamp: datetime.datetime, spw: str) str[source]
Generate measurement set paths that look like /path/to/working_dir/msfiles/2018-03-02/hh=02/2018-03-02T02:02:02/00_2018-03-02T02:02:02.ms.
- Parameters:
timestamp – Timestamp of the ms.
spw – Spectral window of the ms.
- Returns:
Path to the measurement set.
- get_ms_parent_path(timestamp: datetime.datetime) str[source]
Generate measurement set parent paths that look like /path/to/working_dir/msfiles/2018-03-02/hh=02/2018-03-02T02:02:02/.
- Parameters:
timestamp – Timestamp of the ms.
- Returns:
Path to the measurement set.
- get_data_product_path(timestamp: datetime.datetime, product: str, file_suffix: str, file_prefix: str | None = None) str[source]
Generate path for generic data product. Looks like /path/to/working_dir/<product>/2018-03-02/hh=02/<file_prefix>_2018-03-02T02:02:02<file_suffix>. The first underscore is not there is file_prefix=None
- Parameters:
timestamp – Timestamp of observation.
product – Name of the data product to be used for top-level directory
file_suffix – Suffix to data file. For example for fits file it’d be ‘.fits’. You can also have something like ‘_v2.fits’
file_prefix – Prefix of the file. Can be spectral window. If none specified then no prefix.
Returns: Full path to the data product.
- get_flag_npy_path(timestamp: datetime.datetime) str[source]
Return the a priori npy for the flags column for a given time.
- Parameters:
timestamp
- Returns:
If only one flag_npy was supplied, the flag_npy; if a Dict[datetime, str] is supplied, the closest one in time to the supplied timestamp.
- time_filter(start_time: datetime.datetime, end_time: datetime.datetime) OfflinePathsManager[source]
Returns another PathsManager object with only utc_times between start_time (inclusive) and end_time (exclusive).
- Parameters:
start_time
end_time
- Returns:
New PathsManager object with time filtered.
- Return type:
new_paths_manager
- chunks_by_integration(chunk_size: int) List[List[datetime.datetime]][source]
Chunk the datetime array by number of integrations such that each chunk contains data spanning equal or less than the chunk size. Note that the last chunk may be smaller, if the total number of integrations is not divisible by the chunk size.
- Parameters:
chunk_size – number of integrations per chunk
Returns: A list whose elements are the ordered chunks, which are each a list of ordered timestamps.
- abstractmethod chunks_by_time(chunk_time: datetime.timedelta) List[List[datetime.datetime]][source]
Chunk the datetime array by time such that each chunk contains data spanning equal or less than chunk_time amount of time. All of the data will be chunked. Note that
The last chunk may be smaller, if the span of the data is not divisible by the chunk time
2) Chunking is based on time and not by the number of integrations. Therefore, some chunks might have more or fewer integrations than some other chunks, if the chunk time is not divisible by the integration time.
- Parameters:
chunk_time – a timedelta object for the chunk time.
Returns: A list whose elements are the ordered chunks, which are each a list of ordered timestamps.