"""Calibration utilities for source model generation.
Provides functions for creating calibrator source models, selecting
calibration time ranges based on source transits, and generating
CASA component lists for calibration and peeling.
"""
import numpy as np
from os import path
import math, shutil
from datetime import datetime
from astropy.time import Time
from astropy.coordinates import SkyCoord, ICRS
from orca.utils.coordutils import CYG_A, OVRO_LWA_LOCATION, is_visible, get_altaz_at_ovro
from casacore import tables
from casacore import measures
from casatools import componentlist
from orca.utils.beam import WoodyBeam as beam
import re
import os
import astropy.units as u
[docs]
SRC_LIST = [{'label': 'CasA', 'flux': 16530, 'alpha': -0.72, 'ref_freq': 80.0,
'position': 'J2000 23h23m24s +58d48m54s'},
{'label': 'CygA', 'flux': 16300, 'alpha': -0.58, 'ref_freq': 80.0,
'position': 'J2000 19h59m28.35663s +40d44m02.0970s'},
{'label': 'VirA', 'flux': 2014, 'alpha': -0.79, 'ref_freq': 80.0,
'position': 'J2000 12h30m49.42338s +12d23m28.0439s'},
{'label': 'TauA', 'flux': 1770, 'alpha': -0.27, 'ref_freq': 80.0,
'position': 'J2000 05h34m31.94s +22d00m52.2s'}]
[docs]
def calibration_time_range(start_time: datetime,
end_time: datetime, duration_min: float = 20):
"""Get dada file names based on Cygnus A transit for calibration.
Get list of .dada file names to use for calibration. Selects .dada files that
span {duration_min} centered on transit of Cygnus A.
Args:
utc_times_txt_path: Path to utc_times.txt file.
start_time: Start time of data for which to derive calibration tables,
datetime format.
end_time: End time of data for which to derive calibration tables,
datetime format.
duration_min: In minutes, amount of time used for calibration. Default is 20 min.
Returns:
cal_start_time and cal_end_time covering {duration_min} calibration range,
in datetime format.
"""
start_lst = Time(start_time).sidereal_time('apparent', longitude=OVRO_LWA_LOCATION.lon)
end_lst = Time(end_time).sidereal_time('apparent', longitude=OVRO_LWA_LOCATION.lon)
# Select {duration_min} range of LSTs where CygA is closest to zenith
CygA_HA = CYG_A.ra.hourangle
CygA_HA_start = (start_lst - CygA.ra).wrap_at(180*u.deg).hour
CygA_HA_end = (end_lst - CygA.ra).wrap_at(180*u.deg).hour
cal_start_time = start_time + timedelta(hours=(CygA_HA_start - duration_min/2./60.))
cal_end_time = start_time + timedelta(hours=(CygA_HA_end + duration_min/2./60.))
return cal_start_time, cal_end_time
[docs]
def gen_model_ms_stokes(ms: str, zest: bool = False):
""" Generate component lists for calibration / polarized peeling in CASA.
Args:
ms: Measurement set to generate model for.
zest: For supplying component lists for polarized peeling. Default is False.
Returns:
Returns path to component list(s). If zest=True, will return a list of paths to
single-source component lists.
"""
src_list = SRC_LIST
with tables.table(ms, ack=False) as t:
t0 = t.getcell('TIME', 0)
me = measures.measures()
time = me.epoch('UTC', '%fs' % t0)
timeT = Time(time['m0']['value'], format='mjd', scale='utc')
timeT.format = 'datetime'
utctime = timeT.value
lsttime = timeT.sidereal_time('apparent', OVRO_LWA_LOCATION.lon).value
freq = float(tables.table(ms+'/SPECTRAL_WINDOW', ack=False).getcell('REF_FREQUENCY', 0))/1.e6
#
outbeam = beam(ms)
cal_srcs = []
for s,src in enumerate(src_list):
src_position: str = src.get('position') # type: ignore
ra = src_position.split(' ')[1]
dec = src_position.split(' ')[2]
if is_visible(SkyCoord(ra, dec, frame=ICRS), utctime) and \
(not zest or (zest and (lsttime < 8 or lsttime > 13))):
altaz = get_altaz_at_ovro(SkyCoord(ra, dec, frame=ICRS), utctime)
scale = np.array(outbeam.srcIQUV(altaz.az.deg, altaz.alt.deg))
src_list[s]['Stokes'] = list(_flux80_47(src['flux'], src['alpha'], freq,
src['ref_freq']) * scale)
cal_srcs.append(src_list[s])
if not cal_srcs:
return
cl = componentlist()
if zest:
fluxIvals = [src.get('Stokes')[0] for src in cal_srcs] # type: ignore
sortedinds = np.argsort(fluxIvals)[::-1]
cllist = []
counter = 0
for ind in sortedinds:
src = cal_srcs[ind]
clname = '%s_%d.cl' % (path.splitext(path.abspath(ms))[0],counter)
try:
shutil.rmtree(clname)
except OSError:
pass
cl.done()
cl.addcomponent(flux=src['Stokes'], polarization='Stokes',
dir=src['position'], label=src['label'])
cl.setstokesspectrum(which=0, type='spectral index',
index=[src['alpha'], 0, 0, 0], reffreq='%sMHz' % freq)
cl.rename(clname)
cl.done()
cllist.append(clname)
counter+=1
return cllist
else:
cl.done()
clname = '%s.cl' % path.splitext(path.abspath(ms))[0]
try:
shutil.rmtree(clname)
except OSError:
pass
for s,src in enumerate(cal_srcs):
cl.addcomponent(flux=src['Stokes'], polarization='Stokes',
dir=src['position'], label=src['label'])
cl.setstokesspectrum(which=s, type='spectral index',
index=[src['alpha'], 0, 0, 0], reffreq='%sMHz' % freq)
cl.rename(clname)
cl.done()
return clname
[docs]
def gen_model_from_dict(ms: str, npzfile: str):
"""
"""
Ateamdict = np.load(npzfile)
sourcename_list = Ateamdict['sourcename']
source_Vflux_list = Ateamdict['pkflux']
source_ra_list = Ateamdict['ra_pos']
source_dec_list = Ateamdict['dec_pos']
freq_val = Ateamdict['frqval']
src_list = [{'label': 'VirA', 'flux': '2400', 'alpha': -0.86, 'ref_freq': 80.0,
'position': 'J2000 12h30m49.42338s +12d23m28.0439s'},
{'label': 'TauA', 'flux': '1770', 'alpha': -0.27, 'ref_freq': 80.0,
'position': 'J2000 05h34m31.94s +22d00m52.2s'},
{'label': 'CasA', 'flux': 16530, 'alpha': -0.72, 'ref_freq': 80.0,
'position': 'J2000 23h23m24s +58d48m54s'}]
freq = freq_val/1.e6
sub_srcs = []
for s,src in enumerate(src_list):
if not np.isnan(source_Vflux_list[s]) and not np.isnan(source_ra_list[s]) and not np.isnan(source_dec_list[s]):
src_list[s]['Stokes'] = [0, 0, 0, -source_Vflux_list[s]]
new_pos = SkyCoord(source_ra_list[s], source_dec_list[s], frame='icrs', unit='deg')
src_list[s]['position'] = f'J2000 {new_pos.to_string("hmsdms")}'
sub_srcs.append(src_list[s])
if not sub_srcs:
return
cl = componentlist()
cl.done()
clname = '%s.cl' % path.splitext(path.abspath(ms))[0]
try:
shutil.rmtree(clname)
except OSError:
pass
for s,src in enumerate(sub_srcs):
cl.addcomponent(flux=src['Stokes'], polarization='Stokes',
dir=src['position'], label=src['label'])
cl.setstokesspectrum(which=s, type='spectral index',
index=[src['alpha'], 0, 0, 0], reffreq='%sMHz' % freq)
cl.rename(clname)
cl.done()
return clname
def _flux80_47(flux_hi, sp, output_freq, ref_freq):
# given a flux at 80 MHz and a sp_index,
# return the flux at MS center-frequency.
return flux_hi * 10 ** (sp * math.log(output_freq/ref_freq, 10))
'''def parse_filename(filename):
pattern = r'(\d{8})_(\d{6})_.*\.ms'
#match = re.match(pattern, filename)
base_fname = os.path.basename(filename)
match = re.match(pattern, base_fname)
if not match:
raise ValueError("Filename does not match the expected format 'YYYYMMDD_HHMMSS_*.ms'")
date_str, time_str = match.groups()
iso_time = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}T{time_str[:2]}:{time_str[2:4]}:{time_str[4:6]}"
return iso_time
'''
[docs]
def parse_filename(filename):
"""
Parse a filename of the form YYYYMMDD_HHMMSS_anything.ms and extract the UTC time.
"""
pattern = r'(\d{8})_(\d{6})_([^_]+)\.ms' # Updated pattern to avoid greediness
base_fname = os.path.basename(filename) # Extracts just the filename
match = re.match(pattern, base_fname)
if not match:
raise ValueError(f"Filename does not match the expected format 'YYYYMMDD_HHMMSS_*.ms'. Got: {base_fname}")
date_str, time_str, extra = match.groups()
iso_time = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}T{time_str[:2]}:{time_str[2:4]}:{time_str[4:6]}"
return iso_time
[docs]
def get_lst_from_filename(filename):
utc_time_iso = parse_filename(filename)
time = Time(utc_time_iso, format='isot', scale='utc')
lst = time.sidereal_time('apparent', longitude=OVRO_LWA_LOCATION.lon)
return lst
[docs]
def source_ra_in_hours(position_str):
parts = position_str.split()
if len(parts) != 3:
raise ValueError("Position string not in expected 'J2000 RA DEC' format.")
ra_str, dec_str = parts[1], parts[2]
coord = SkyCoord(ra_str, dec_str, frame=ICRS)
return coord.ra.hour
[docs]
def is_within_transit_window(filename, window_minutes=4):
lst = get_lst_from_filename(filename).hour
half_window_hours = (window_minutes / 2.) / 60.0
in_window_sources = []
for src in SRC_LIST:
ra_h = source_ra_in_hours(src['position'])
diff = abs((lst - ra_h + 12) % 24 - 12)
if diff <= half_window_hours:
in_window_sources.append(src)
return in_window_sources
[docs]
def get_relative_path(ms_path):
# Extract the sub-path starting after '/slow/', '/slow-averaged/', or '/night-time/'
# For example, from '/lustre/pipeline/night-time/73MHz/2023-11-21/03/20231121_000005_73MHz.ms'
# we get '73MHz/2023-11-21/03/20231121_000005_73MHz.ms'
if '/slow-averaged/' in ms_path:
parts = ms_path.split('/slow-averaged/', 1)
relative_path = parts[1].strip('/')
return relative_path
elif '/slow/' in ms_path:
parts = ms_path.split('/slow/', 1)
relative_path = parts[1].strip('/')
return relative_path
elif '/night-time/' in ms_path:
parts = ms_path.split('/night-time/', 1)
relative_path = parts[1].strip('/')
return relative_path
elif '/fast/pipeline/' in ms_path:
parts = ms_path.split('/fast/pipeline/', 1)
relative_path = parts[1].strip('/')
return relative_path
else:
raise ValueError("Input MS path does not contain '/slow/', '/slow-averaged/', or '/night-time/'")
[docs]
def build_output_paths(ms_path, base_output_dir='/lustre/pipeline/slow-averaged/'):
relative_path = get_relative_path(ms_path)
rel_dir = os.path.dirname(relative_path) # e.g. '73MHz/2024-11-29/00'
ms_base = os.path.splitext(os.path.basename(relative_path))[0] # e.g. '20241129_000005_73MHz'
output_dir = os.path.join(base_output_dir, rel_dir)
os.makedirs(output_dir, exist_ok=True)
return output_dir, ms_base