"""Calibrator sky model generation for OVRO-LWA.
This module generates sky models for calibration sources with primary beam
corrections. Supports both full-sky models and specific source selection
(e.g., Cygnus A only for delay calibration).
Classes:
model_generation: Main class for generating CASA component lists.
The generated component lists can be applied to measurement sets using
CASA's ft task for model-based calibration.
"""
#from . import primary_beam
import numpy as np
import math,os,logging
from casatasks import clearcal, ft
from casatools import table, measures, componentlist, msmetadata
from orca.utils.beam_nmahesh import jones_beam as beam
from astropy.io import fits
from orca.utils.modelutils import (
get_sun_pos,
get_solar_altaz_multiple_times,
get_solar_azel,
get_msinfo,
get_image_data,
get_image_maxmin,
check_image_quality,
restore_flag,
get_flagged_solution_num,
get_strong_source_list,
get_time_from_name,
get_timestr_from_name,
get_freqstr_from_name,
get_selfcal_time_to_apply,
get_keyword,
put_keyword,
get_obs_time_interval,
convert_to_heliocentric_coords,
make_wsclean_compatible,
get_total_fields,
collect_fast_fits,
rename_images,
check_corrected_data_present,
correct_primary_beam,
get_solar_loc_pix,
get_rms,
blank_all_pixels,
get_uvlambda_from_uvdist,
swap_fastms_pols,
correct_fastms_amplitude_scale,
compress_fits_to_h5,
recover_fits_from_h5,
check_h5_fits_consistency,
check_for_file_presence,
manual_split_corrected_ms,
get_primary_beam_single_source,
get_caltable_freq
)
[docs]
def conv_deg(dec):
if 's' in dec:
dec = dec.split('s')[0]
if 'm' in dec:
dec, ss = dec.split('m')
if ss == '':
ss = '0'
dd, mm = dec.split('d')
if dd.startswith('-'):
neg = True
else:
neg = False
deg = float(dd) + float(mm) / 60 + float(ss) / 3600
return '%fdeg' % deg
[docs]
class model_generation():
def __init__(self,vis=None,filename='calibrator_source_list.txt',pol='I,Q,U,V',separate_pol=True,model=True):
'''
vis: Name of MS
min_beam_val: The minimum primary beam value, below which the source
will not be considered during calibration
separate_pol: If True, separate polarisations will be analysed separately
and the resultant model can be different. This parameter
is only used if WSClean clean components are used for
generating the source model
pol: Polarisations for which model is needed. Can be I,Q,U,V, I, XX,YY
model: This is used only if WSClean components are used. This tells the code
that we are generating a model (units Jy/pixel) and not an image units Jy/beam).
'''
self.vis=vis
[docs]
self.separate_pol=separate_pol
self.pol=pol
self.calfilepath = '/data07/msurajit/ovro-lwa-solar/defaults/'
[docs]
self.outpath=os.getcwd()+"/"
[docs]
self.polarisations=self.pol.split(',')
[docs]
self.num_pol=len(self.polarisations)
self.filename=filename
[docs]
self.point_source_model_needed=True
[docs]
self.primary_beam_model='/lustre/msurajit/beam_model_nivedita/OVRO-LWA_soil_pt.h5'
if (self.separate_pol==True and self.num_pol==1) or \
(self.separate_pol==False and self.num_pol!=1):
raise RuntimeError("The two keywords \'separate_pol\' and \'pol\' are inconsistent")
@property
[docs]
def vis(self):
return self._vis
@vis.setter
def vis(self,value):
if os.path.isdir(value):
self._vis=value
else:
logging.error("MSfile is not found.")
raise RuntimeError("MSfile is not found")
@property
[docs]
def calfilepath(self):
return self._calfilepath
@calfilepath.setter
def calfilepath(self,value):
if os.path.exists(value):
self._calfilepath=value
else:
self._calfilepath=None
@property
[docs]
def filename(self):
return self._filename
@filename.setter
def filename(self,value):
if self.separate_pol==True:
print (value)
self._filename=[value+"."+pol_str for pol_str in self.polarisations]
else:
self._filename=[value]
@property
[docs]
def pol(self):
return self._pol
@pol.setter
def pol(self,value):
temp=value.split(',')
for i in temp:
if i not in ['I','Q','U','V','XX','YY','XY','YX']:
raise RuntimeError("Only XX,YY,XY,YX,I,Q,U,V recognised now.")
self._pol=value
[docs]
def gen_model_file(self):
"""
:param filename: output txt file that contains clean components for all visible strong calibration sources
in wsclean format
:param visibility: input visibility
:return: N/A
"""
srcs,az,el=self.get_risen_source_list()
pb=beam(msfile=self.vis)
pb.read_beam_file()
pb.srcjones(np.array(az),np.array(el))
self.file_handle = [open(self.filename[i], 'w') for i in range(self.num_pol)]
s=0
for azev,elev in zip(az,el):
#matrix=pb.get_source_pol_factors(pb.jones_matrices[s,:,:])
matrix=pb.get_muller_matrix_stokes(pb.jones_matrices[s,:,:])
if matrix[0,0] > self.min_beam_val:# and matrix[1,1] > self.min_beam_val:
for i in range(self.num_pol):
self.write_source_file(i,srcs[s]['label'], matrix, s)
s+=1
return
[docs]
def write_source_file(self,current_pol_index,source_name, muller_matrix, source_num): #### works only if logarithimicSI is false
primary_beam_value=self.primary_beam_value(current_pol_index,muller_matrix) ### assumes an unpolarised source
try:
f1 = open(self.calfilepath + source_name+ ".txt", "r")
j = 0
while True:
line = f1.readline()
if not line:
break
if source_num == 0 and j == 0:
self.file_handle[current_pol_index].write(line)
elif j != 0:
try:
splitted = line.split(',')
I_flux = float(splitted[4])
beam_corrected_I_flux = I_flux * primary_beam_value
splitted[4] = str(beam_corrected_I_flux)
for k, phrase in enumerate(splitted[5:]):
if k == 0:
splitted[5 + k] = '[' + str(float(phrase[1:]) * primary_beam_value)
else:
if phrase[-1] == ']':
splitted[5 + k] = str(float(phrase[:-1]) * primary_beam_value) + ']'
break
else:
splitted[5 + k] = str(float(phrase) * primary_beam_value)
line1 = ','.join(splitted)
line1 = line1[:-1] + " "+ "\n"
if splitted[5 + k + 1] == 'false':
self.file_handle[current_pol_index].write(line1)
else:
raise RuntimeError("Function now works only if logarithmicSI is false")
except IndexError:
pass
j += 1
finally:
f1.close()
@staticmethod
[docs]
def primary_beam_value(current_pol_index,muller_matrix):
'''
returns elements from the first column of Muller Matrix
'''
I_factor=muller_matrix[0,0].real
Q_factor=muller_matrix[1,0].real
U_factor=muller_matrix[2,0].real
V_factor=muller_matrix[3,0].real
if current_pol_index==0:
return I_factor
elif current_pol_index==1:
return Q_factor
elif current_pol_index==2:
return U_factor
else:
return V_factor
[docs]
def ctrl_freq(self):
msmd=msmetadata()
msmd.open(self.vis)
self.avg_freq=msmd.meanfreq(0)*1e-6
msmd.done()
if not self.output_freq:
self.output_freq=self.avg_freq
[docs]
def predict_flux(self,flux_hi, sp,muller_matrix,ref_freq):
"""
Given a flux at reference frequency in MHz and a sp_index, return the flux other frequenct
:param flux_hi: flux at the reference frequency
:param sp: spectral index
:param ref_freq: reference frequency in MHz
:param output_freq: output frequency in MHz
:return: flux caliculated at the output frequency
"""
freq_I=flux_hi * 10 ** (sp * np.log10(self.output_freq / ref_freq))
I_flux=self.primary_beam_value(0,muller_matrix)*freq_I
Q_flux=self.primary_beam_value(1,muller_matrix)*freq_I
U_flux=self.primary_beam_value(2,muller_matrix)*freq_I
V_flux=self.primary_beam_value(3,muller_matrix)*freq_I
print (I_flux,Q_flux,U_flux,V_flux)
return [I_flux,Q_flux, U_flux, V_flux]
[docs]
def get_risen_source_list(self):
srcs = get_strong_source_list()
az=[]
el=[]
if self.includesun:
srcs.append({'label': 'Sun', 'flux': str(solar_flux), 'alpha': solar_alpha,
'position': 'SUN'})
tb=table()
tb.open(self.vis)
t0 = tb.getcell('TIME', 0)
tb.close()
me=measures()
ovro = me.observatory('OVRO_MMA')
time = me.epoch('UTC', '%fs' % t0)
me.doframe(ovro)
me.doframe(time)
for s in range(len(srcs) - 1, -1, -1):
coord = srcs[s]['position'].split()
d0 = None
if len(coord) == 1:
d0 = me.direction(coord[0])
d0_j2000 = me.measure(d0, 'J2000')
srcs[s]['position'] = 'J2000 %frad %frad' % (d0_j2000['m0']['value'], d0_j2000['m1']['value'])
elif len(coord) == 3:
coord[2] = conv_deg(coord[2])
d0 = me.direction(coord[0], coord[1], coord[2])
else:
raise Exception("Unknown direction")
d = me.measure(d0, 'AZEL')
elev = d['m1']['value']*180/np.pi
azev=d['m0']['value']*180/np.pi
if azev<0:
azev=360+azev
if elev<0:
del srcs[s]
elif np.sin(d['m1']['value'])**1.6<self.min_beam_val:
del srcs[s]
else:
srcs[s]['el']=elev
srcs[s]['az']=azev
return srcs
[docs]
def point_source_model(self, included_sources=None):
srcs=self.get_risen_source_list()
if included_sources:
srcs = [s for s in srcs if s["label"] in included_sources]
cl = componentlist()
cl.done()
modelcl = self.vis.replace('.ms', '.cl')
pb=beam(msfile=self.vis,beam_file_path=self.primary_beam_model)
pb.read_beam_file()
for s in srcs:
pb.srcjones(np.array([s['az']]),np.array([s['el']]))
matrix=pb.get_muller_matrix_stokes(pb.jones_matrices[0,:,:])
if matrix[0,0] > self.min_beam_val:
s['flux'] = self.predict_flux(float(s['flux']), s['alpha'], matrix, s['ref_freq'])
cl.addcomponent(flux=s['flux'], dir=s['position'], index=s['alpha'], polarization='Stokes',
spectrumtype='spectral index', freq='{0:f}MHz'.format(self.output_freq), label=s['label'])
logging.debug(
"cl.addcomponent(flux=%s, dir='%s', index=%s, spectrumtype='spectral index', label='%s', polarization='Stokes')" % (
s['flux'], s['position'], s['alpha'], s['label']))
if os.path.exists(modelcl) and self.overwrite:
os.system('rm -rf ' + modelcl)
cl.rename(modelcl)
cl.done()
return modelcl, True
[docs]
def reset_image(self,imagename):
##### making the residual a blank image
logging.debug("Setting all values of dummy image to 0.")
if self.pol=='I':
hdu = fits.open(imagename + "-residual.fits", mode="update")
hdu[0].data *= 0.0
hdu.flush()
hdu.close()
else:
for pola in self.polarisations:
hdu = fits.open(imagename +"-"+pola+ "-residual.fits", mode="update")
hdu[0].data *= 0.0
hdu.flush()
hdu.close()
[docs]
def generate_model_from_component_list(self,imagename):
if self.pol=='I':
os.system("wsclean -no-dirty -no-update-model-required -restore-list " + \
imagename + "-residual.fits " + self.filename[0] + " "+self.outpath+"calibrator-model.fits " + self.vis)
else:
for i,pola in enumerate(self.polarisations):
os.system("wsclean -no-dirty -no-update-model-required -restore-list " + \
imagename +"-"+pola+ "-residual.fits " + self.filename[i] + " "+self.outpath+"calibrator-"+pola+"-model.fits " + self.vis)
if os.path.isfile("calibrator-model.fits") == False and \
(os.path.isfile("calibrator-I-model.fits")==False or \
os.path.isfile("calibrator-Q-model.fits")==False or \
os.path.isfile("calibrator-U-model.fits")==False or \
os.path.isfile("calibrator-V-model.fits")==False):
logging.warning("Calibrator model not generated. Proceeding with point source model")
raise RuntimeError("WSClean version 3.3 or above. Proceeding with point source model")
if self.model==True:
for i,pola in enumerate(self.polarisations):
self.correct_for_restoring_beam("calibrator-"+pola+"-model.fits")
[docs]
def correct_for_restoring_beam(self,image):
from casatasks import imhead
a=imhead(image)
major=a['restoringbeam']['major']['value']
minor=a['restoringbeam']['minor']['value']
cell=abs(a['incr'][0])*180/3.14159*3600
major_pix=major/cell
minor_pix=minor/cell
area=np.pi*major_pix*minor_pix/(4*np.log(2))
hdu=fits.open(image,mode='update')
hdu[0].data/=area
hdu.flush()
hdu.close()
[docs]
def check_negative_in_model(self):
if self.pol=='I':
max1, min1 = get_image_maxmin(self.outpath+"calibrator-model.fits", local=False)
if min1 < 0 and (max1 / max(abs(min1), 0.000001)) < 10000: ### some small negative is tolerable
raise RuntimeError("Negative in model. Going for point source model")
else:
for pola in self.polarisations:
max1, min1 = get_image_maxmin(self.outpath+"calibrator-"+pola+"-model.fits", local=False)
if min1 < 0 and (max1 / max(abs(min1), 0.000001)) < 10000: ### some small negative is tolerable
raise RuntimeError("Negative in model. Going for point source model")
[docs]
def do_prediction(self):
if self.predict==False:
return
os.system("wsclean -predict -pol "+self.pol+" -name "+self.outpath+"calibrator " + self.vis)
[docs]
def gen_dummy_image(self,imagename):
os.system("wsclean -no-dirty -no-update-model-required -size 4096 4096 " + \
"-scale 2arcmin -niter 10 -pol "+self.pol+" -name " + imagename + " " + self.vis)
[docs]
def gen_model_cl(self, included_sources=None):
"""
Generate source models for bright sources as CASA clean components
:param msfile: input visibility
:param ref_freq: reference frequency of the preset flux values of bright sources
:param output_freq: output frequency to be written into the CASA component list
:param includesun: if True, add a precribed solar flux to the source list
:return:
"""
self.ctrl_freq()
if self.avg_freq>60:
self.point_source_model_needed=True
if self.point_source_model_needed==True:
modelcl, ft_needed = self.point_source_model(included_sources)
return modelcl, ft_needed
if self.includesun == True:
logging.debug("User wants to add solar model")
logging.debug("Proceeding to use point source model generation scheme.")
modelcl, ft_needed = self.point_source_model()
return modelcl, ft_needed
try:
logging.debug("Generating component list using Gasperin et al. (2021)")
self.gen_model_file()
imagename = self.outpath+"dummy"
logging.debug("Generating a dummy image")
print ("Generating dummy image")
self.gen_dummy_image(imagename)
self.reset_image(imagename)
self.generate_model_from_component_list(imagename)
logging.debug("Model file generated using the clean component list")
self.check_negative_in_model()
self.do_prediction()
return None, False
except:
logging.warning("Negative in model. Going for point source model.")
modelcl, ft_needed = self.point_source_model()
return modelcl, ft_needed