"""Primary beam models for OVRO-LWA (N. Mahesh implementation).
This module provides classes for computing primary beam corrections using
various beam models including analytic approximations and simulated beam
patterns. Used for source flux density corrections and calibrator modeling.
Classes:
analytic_beam: Simple sin^1.6 elevation-based beam model.
jones_beam: Full Jones matrix beam from HDF5 simulation files.
"""
import numpy as np
import glob, logging, math, os
from casatools import msmetadata
from scipy.interpolate import griddata as gd
import h5py
from scipy.interpolate import RegularGridInterpolator
[docs]
def knn_search(arr, grid):
"""Find nearest neighbor of array of points in a multi-dimensional grid.
Args:
arr: Query points array.
grid: Grid of reference points.
Returns:
Index of nearest neighbor.
Source:
glowingpython.blogspot.com/2012/04/k-nearest-neighbor-search.html
"""
gridsize = grid.shape[1]
dists = np.sqrt(((grid - arr[:,:gridsize])**2.).sum(axis=0))
return np.argsort(dists)[0]
[docs]
def primary_beam_correction_val(pol, jones_matrix):
"""Calculate primary beam correction value for a polarization.
Args:
pol: Polarization type ('XX', 'YY', or 'I').
jones_matrix: 2x2 Jones matrix.
Returns:
Beam correction factor for the specified polarization.
"""
if pol=='XX':
return jones_matrix[0,0]**2
if pol=='YY':
return jones_matrix[1,1]**2
if pol=='I':
return 0.5*(jones_matrix[1,1]**2+jones_matrix[0,0]**2)
[docs]
class analytic_beam():
"""Analytic primary beam model using sin^1.6 elevation dependence.
A simplified beam model that approximates the OVRO-LWA primary beam
as a function of elevation only, ignoring azimuthal structure.
Args:
msfile: Path to measurement set (unused).
beam_file_path: Path to beam files (unused).
freq: Observing frequency (unused).
"""
def __init__(self,msfile=None,beam_file_path='/opt/beam/',freq=None):
return
[docs]
def srcjones(self,az,el): ### az, el in degrees
num_sources=np.size(el)
self.jones_matrices=np.zeros((num_sources,2,2))
for i in range(num_sources):
self.jones_matrices[i,:,:]=np.sqrt(np.sin(el[i]*np.pi/180)**1.6*np.identity(2))
return
[docs]
def read_beam_file(self):
'''
dummy function to make the code similar for all classes
'''
return
[docs]
def get_muller_matrix_XY(self,jones_matrix):
'''
The muller matrix is in XY coordinate frame
So
(XX_obs,XY_obs,YX_obs,YY_obs).T=((M00,M01,M02,M03),\
(M10,M11, M12, M13),\
(M20, M21, M22, M23),\
(M30, M31, M32, M33)) (XX,XY,YX,YY).T
Note that this Muller Matrix is not normalised.
'''
J1=jones_matrix
J2=np.conj(J1)
XY_muller_matrix=np.zeros((4,4),dtype=complex)
XY_muller_matrix[0:2,0:2]=J1[0,0]*J2
XY_muller_matrix[0:2,2:4]=J1[0,1]*J2
XY_muller_matrix[2:4,0:2]=J1[1,0]*J2
XY_muller_matrix[2:4,2:4]=J1[1,1]*J2
@staticmethod
[docs]
def get_source_pol_factors(jones_matrix):
return jones_matrix**2
def get_muller_matrix_XY(self,jones_matrix):
'''
The muller matrix is in XY coordinate frame
So
(XX_obs,XY_obs,YX_obs,YY_obs).T=((M00,M01,M02,M03),\
(M10,M11, M12, M13),\
(M20, M21, M22, M23),\
(M30, M31, M32, M33)) (XX,XY,YX,YY).T
Note that this Muller Matrix is not normalised.
'''
J1=jones_matrix
J2=np.conj(J1)
XY_muller_matrix=np.zeros((4,4),dtype=complex)
XY_muller_matrix[0:2,0:2]=J1[0,0]*J2
XY_muller_matrix[0:2,2:4]=J1[0,1]*J2
XY_muller_matrix[2:4,0:2]=J1[1,0]*J2
XY_muller_matrix[2:4,2:4]=J1[1,1]*J2
return XY_muller_matrix
[docs]
def get_muller_matrix_stokes(self,jones_matrix):
'''
The muller matrix is in Stokes coordinate frame
So
(I_obs,Q_obs,U_obs,V_obs).T=((M00,M01,M02,M03),\
(M10,M11, M12, M13),\
(M20, M21, M22, M23),\
(M30, M31, M32, M33)) (I,Q,U,V).T
Note that this Muller Matrix is not normalised.
'''
XY_muller_matrix=self.get_muller_matrix_XY(jones_matrix)
T=0.5*np.array([[1,0,0,1],[1,0,0,-1],[0,1,1,0],\
[0,-1j,1j,0]],dtype=complex) ### see eq 8 of Hamaker et al. 1996
S=np.array([[1,1,0,0],[0,0,1,1j],[0,0,1,-1j],\
[1,-1,0,0]],dtype=complex) ### see eq 9 of Hamaker et al. 1996
stokes_muller_matrix=np.matmul(np.matmul(T,XY_muller_matrix),S)
return stokes_muller_matrix
[docs]
class woody_beam():
'''
This class uses a beam_file_path. This path is the location where all the relevant
beamfiles are located. If freq is provided, then msfile is not required. However, if
freq is not provided, msfile is must. In this case, the frequency used is the central
frequency corresponding to the msfile. This code can only accept one frequency, but can
work simulatenously over multiple sources. This code uses the Woody beam model.
'''
def __init__(self,msfile=None,beam_file_path='/opt/beam/',freq=None):
try:
self.beam_file_path=beam_file_path
except:
self._beam_file_path=None
if self.freq is None:
self.msfile=msfile
self.ctrl_freq()
[docs]
def ctrl_freq(self):
'''
Reads the central frequency of the MS file for the spw 0.
'''
if self.freq is not None:
return
msmd=msmetadata()
msmd.open(self.msfile)
self.freq = msmd.meanfreq(0)*1e-6
msmd.done()
[docs]
def get_beam_file(self):
'''
Gets all the beamfiles from the relevant location and then chooses the file
while is most relevant for the frequency which the user has requested.
'''
all_beam_files=glob.glob(self.beam_file_path+'/beamIQUV_*.npz')
all_freqs=np.array([float(file1.split('/')[-1].split('_')[-1].split('.npz')[0]) for file1 in all_beam_files])
self.ctrl_freq()
diff=abs(all_freqs-self.freq)
ind=np.argsort(diff)[0]
return all_beam_files[ind]
[docs]
def read_beam_file(self):
'''
Reads the beamfile. If reading the beamfile is unsuccessfull, then it switches
to the analytical beam.
'''
try:
self.azelgrid = np.load(self.beam_file_path+'/azelgrid.npy')
self.gridsize = self.azelgrid.shape[-1]
# load 4096x4096 grid of IQUV values, for given msfile CRFREQ
self.beamIQUVfile = self.get_beam_file()#self.beam_file_path+'/beamIQUV_'+str(self.freq)+'.npz'
if os.path.exists(self.beamIQUVfile):
self.beamIQUV = np.load(self.beamIQUVfile)
self.Ibeam = self.beamIQUV['I']
self.Qbeam = self.beamIQUV['Q']
self.Ubeam = self.beamIQUV['U']
self.Vbeam = self.beamIQUV['V']
logging.debug('Beam files read successfully')
else:
raise RuntimeError
except OSError:
logging.warning("Beam file does not exist in give path."+\
"Switching to analytical beam.")
self.beamIQUV = np.nan
@property
[docs]
def beam_file_path(self):
return self._beam_file_path
@beam_file_path.setter
def beam_file_path(self,value):
files=glob.glob(value+"*.npy")
if len(files)!=0:
self._beam_file_path=value
else:
logging.warning("Beam file does not exist in give path."+\
"Switching to analytical beam.")
self._beam_file_path=None
@property
[docs]
def msfile(self):
return self._msfile
@msfile.setter
def msfile(self,value):
if os.path.isdir(value):
self._msfile=value
else:
raise RuntimeError
[docs]
def srcjones(self,az,el):
"""
The function name is srcjones to keep naming consistent with Jones_beam class.
But this function returns I,Q,U,V beams.
Compute beam scaling factor
Can work simulatenously over multiple sources. So the az and el can be two arrays.
The number of sources is assumed to be equal to the number of elements in alt/az
Args:
az: azimuth in degrees
el: elevation in degrees
Returns: [I,Q,U,V] flux factors, where for an unpolarized source [I,Q,U,V] = [1,0,0,0]
"""
num_sources=len(az)
self.jones_matrices=np.zeros((num_sources,2,2),dtype='complex')
for i in range(num_sources):
try:
if self.beam_file_path is not None:
# index where grid equals source az el values
index = knn_search(np.array([ [az[i]], [el[i]] ]), self.azelgrid.reshape(2,self.gridsize*self.gridsize))
Ifctr = self.Ibeam.reshape(self.gridsize*self.gridsize)[index]
Qfctr = self.Qbeam.reshape(self.gridsize*self.gridsize)[index]
Ufctr = self.Ubeam.reshape(self.gridsize*self.gridsize)[index]
Vfctr = self.Vbeam.reshape(self.gridsize*self.gridsize)[index]
self.jones_matrices[i,:,:]=np.array([[Ifctr+Qfctr,Ufctr+1j*Vfctr],[Ufctr-1j*Vfctr,Ifctr-Qfctr]])
else:
raise RuntimeError
except:
Ifctr=np.sin(el*np.pi/180)**1.6
self.jones_matrices[i,:,:]= np.array([[Ifctr,0],[0,Ifctr]])
return
@staticmethod
[docs]
def get_source_pol_factors(jones_matrix): ### in [[XX,XY],[YX,YY]] format
'''
I am assuming that the source is unpolarised. At these low frequencies this is a good assumption.
Since the jones matrix in this class is essentially the source pol factors, just returning.
'''
return jones_matrix
[docs]
def get_muller_matrix_XY(self,jones_matrix):
'''
The muller matrix is in XY coordinate frame
So
(XX_obs,XY_obs,YX_obs,YY_obs).T=((M00,M01,M02,M03),\
(M10,M11, M12, M13),\
(M20, M21, M22, M23),\
(M30, M31, M32, M33)) (XX,XY,YX,YY).T
Note that this Muller Matrix is not normalised.
'''
J1=jones_matrix
J2=np.conj(J1)
XY_muller_matrix=np.zeros((4,4),dtype=complex)
XY_muller_matrix[0:2,0:2]=J1[0,0]*J2
XY_muller_matrix[0:2,2:4]=J1[0,1]*J2
XY_muller_matrix[2:4,0:2]=J1[1,0]*J2
XY_muller_matrix[2:4,2:4]=J1[1,1]*J2
return XY_muller_matrix
[docs]
def get_muller_matrix_stokes(self,jones_matrix):
'''
The muller matrix is in Stokes coordinate frame
So
(I_obs,Q_obs,U_obs,V_obs).T=((M00,M01,M02,M03),\
(M10,M11, M12, M13),\
(M20, M21, M22, M23),\
(M30, M31, M32, M33)) (I,Q,U,V).T
Note that this Muller Matrix is not normalised.
'''
XY_muller_matrix=self.get_muller_matrix_XY(jones_matrix)
T=0.5*np.array([[1,0,0,1],[1,0,0,-1],[0,1,1,0],\
[0,-1j,1j,0]],dtype=complex) ### see eq 8 of Hamaker et al. 1996
S=np.array([[1,1,0,0],[0,0,1,1j],[0,0,1,-1j],\
[1,-1,0,0]],dtype=complex) ### see eq 9 of Hamaker et al. 1996
stokes_muller_matrix=np.matmul(np.matmul(T,XY_muller_matrix),S)
return stokes_muller_matrix
[docs]
class jones_beam:
"""
For loading and returning LWA dipole beam values (derived from simulations made by Nivedita)
Can only take one frequency at a time now.
The beamfile path should contain the absolute path of the beamfile.
If freq is provided, then msfile is not required. However, if
freq is not provided, msfile is must. In this case, the frequency used is the central
frequency corresponding to the msfile.
"""
def __init__(self,beam_file_path='/lustre/msurajit/beam_model_nivedita/OVRO-LWA_soil_pt.h5',\
msfile=None,freq=None):
self.beamfile=beam_file_path
if self.freq is None:
self.msfile=msfile
self.ctrl_freq()
if not isinstance(self.freq,np.ndarray):
self.freq=np.array([self.freq])
[docs]
self.num_freq=self.freq.size
[docs]
def ctrl_freq(self):
'''
Reads the central frequency of the MS file for the spw 0.
'''
if self.freq is not None:
return
msmd=msmetadata()
msmd.open(self.msfile)
self.freq = np.array([msmd.meanfreq(0) * 1e-6])
msmd.done()
self.num_freq=self.freq.size
@property
[docs]
def beamfile(self):
return self._beamfile
@beamfile.setter
def beamfile(self,value):
if value and os.path.isfile(value):
self._beamfile=value
else:
logging.warning("Beam file does not exist in give path."+\
"Switching to analytical beam.")
self._beamfile=None
@property
[docs]
def msfile(self):
return self._msfile
@msfile.setter
def msfile(self,value):
if os.path.isdir(value):
self._msfile=value
else:
raise RuntimeError
[docs]
def read_beam_file(self): ### freq in MHz
'''
az,za units: radian.
Note that while the beamfiles accept in radian units, this function takes
the alt,azimuth in degrees. The altitude is converted to zenith angle within
the code.
The jones matrices will be normalised with respect to the zenith
'''
try:
with h5py.File(self.beamfile,'r') as hf:
self.model_freqs=np.array(hf['freq_Hz'])*1e-6 ### converting to MHz
self.theta_pts=np.array(hf['theta_pts'])
self.phi_pts=np.array(hf['phi_pts'])
self.Xpol_ephi=np.array(hf['X_pol_Efields/ephi']) # N_freq,N_theta,N_phi
self.Xpol_etheta=np.array(hf['X_pol_Efields/etheta']) #N_freq,N_theta,N_phi
self.Ypol_ephi=np.array(hf['Y_pol_Efields/ephi']) # N_freq,N_theta,N_phi
self.Ypol_etheta=np.array(hf['Y_pol_Efields/etheta']) # N_freq,N_theta,N_phi
### I will normalize using the values at zenith (az=0, zenith_angle=0)
xpol_phi_max=np.abs(self.Xpol_ephi[:,0,0])
xpol_theta_max=np.abs(self.Xpol_etheta[:,0,0])
ypol_phi_max=np.abs(self.Ypol_ephi[:,0,0])
ypol_theta_max=np.abs(self.Ypol_etheta[:,0,0])
self.max_e=np.zeros(self.model_freqs.size)
for i in range(self.model_freqs.size):
### These are the normalising terms. The factor 0.5 comes from the definition
### of I used. I= 0.5*(XX+YY).
self.max_e[i]=np.sqrt(0.5*(xpol_phi_max[i]**2+xpol_theta_max[i]**2+\
ypol_phi_max[i]**2+ypol_theta_max[i]**2))
except:
logging.warning("Beam file does not exist in give path."+\
"Switching to analytical beam.")
self._beamfile=None
[docs]
def match_dimensions(self,data):
shape=data.shape
if not len(shape)==2:
if shape[0]==self.num_sources:
data=np.expand_dims(data,axis=1)
elif shape[0]==self.num_freq:
data=np.expand_dims(data,axis=0)
return data
[docs]
def get_max_val(self):
'''
does accurate beam normalisation
'''
model_freq_size=self.model_freqs.size
self.max_vals=np.array(model_freq_size)
for freq_ind in range(model_freq_size):
gains=np.zeros((self.num_theta,num_phi))
for i in range(num_phi):
for j in range(num_theta):
J1=np.array([[Xpol_etheta[freq_ind,j,i],Xpol_ephi[freq_ind,j,i]],\
[Ypol_etheta[freq_ind,j,i],Ypol_ephi[freq_ind,j,i]]])
J3=self.get_source_pol_factors(J1)
gain[j,i]=0.5*(J3[0,0]+J3[1,1])
self.max_vals[freq_ind]=np.max(np.abs(gain))
del gain
[docs]
def srcjones(self,az,el):
"""Compute beam scaling factor
Can work simultaneously over multiple sources. So the az and el can be two arrays.
The number of sources is assumed to be equal to the number of elements in alt/az
We have implemented an approximate normalization of the beam, which is used by default.
While the function to do a more accurate normalization is already implemented, it is
not used by default.
Args:
(az,el) coordinates in degrees
Returns: Jones matrix at coordinates (az,el)
"""
za=90-el
za_rad=za*np.pi/180
az_rad=az*np.pi/180
self.ctrl_freq()
self.num_sources=len(az)
self.jones_matrices=np.zeros((self.num_sources,2,2),dtype='complex')
if self._beamfile:
#print (np.size(P),np.size(grid_el),np.shape(self.gain_theta[0]))
interpolating_function = RegularGridInterpolator((self.model_freqs,self.theta_pts,self.phi_pts), self.Xpol_etheta)
sources_e_theta_x= interpolating_function((self.freq,za_rad,az_rad))
interpolating_function = RegularGridInterpolator((self.model_freqs,self.theta_pts,self.phi_pts), self.Xpol_ephi)
sources_e_phi_x= interpolating_function((self.freq,za_rad,az_rad))
interpolating_function = RegularGridInterpolator((self.model_freqs,self.theta_pts,self.phi_pts), self.Ypol_etheta)
sources_e_theta_y= interpolating_function((self.freq,za_rad,az_rad))
interpolating_function = RegularGridInterpolator((self.model_freqs,self.theta_pts,self.phi_pts), self.Ypol_ephi)
sources_e_phi_y= interpolating_function((self.freq,za_rad,az_rad))
max_val_freq=np.interp(self.freq,self.model_freqs,self.max_e)
for i in range(self.num_sources):
self.jones_matrices[i,:,:]=[[sources_e_theta_x[i],sources_e_phi_x[i]],\
[sources_e_theta_y[i],sources_e_phi_y[i]]]/max_val_freq ### zenith normalisation
else:
Ifctr=np.sin(el*np.pi/180)**1.6
for i in range(self.num_sources):
self.jones_matrices[i,:,:]= np.array([[Ifctr[i],0],[0,Ifctr[i]]])
return
[docs]
def get_source_pol_factors(self,jones_matrix): ### in [[XX,XY],[YX,YY]] format
'''
I am assuming that the source is unpolarised. At these low frequencies this is a good assumption.
'''
muller=self.get_muller_matrix_stokes(jones_matrix)
Imuller= muller[:,0] ### this is the contribution of I to other stokes
Ifctr=Imuller[0]
Qfctr=Imuller[1]
Ufctr=Imuller[2]
Vfctr=Imuller[3]
return np.array([[Ifctr+Qfctr,Ufctr+1j*Vfctr],[Ufctr-1j*Vfctr,Ifctr-Qfctr]])
[docs]
def get_muller_matrix_XY(self,jones_matrix):
'''
The muller matrix is in XY coordinate frame
So
(XX_obs,XY_obs,YX_obs,YY_obs).T=((M00,M01,M02,M03),\
(M10,M11, M12, M13),\
(M20, M21, M22, M23),\
(M30, M31, M32, M33)) (XX,XY,YX,YY).T
Note that this Muller Matrix is not normalised.
'''
J1=jones_matrix
J2=np.conj(J1)
XY_muller_matrix=np.zeros((4,4),dtype=complex)
XY_muller_matrix[0:2,0:2]=J1[0,0]*J2
XY_muller_matrix[0:2,2:4]=J1[0,1]*J2
XY_muller_matrix[2:4,0:2]=J1[1,0]*J2
XY_muller_matrix[2:4,2:4]=J1[1,1]*J2
return XY_muller_matrix
[docs]
def get_muller_matrix_stokes(self,jones_matrix):
'''
The muller matrix is in Stokes coordinate frame
So
(I_obs,Q_obs,U_obs,V_obs).T=((M00,M01,M02,M03),\
(M10,M11, M12, M13),\
(M20, M21, M22, M23),\
(M30, M31, M32, M33)) (I,Q,U,V).T
Note that this Muller Matrix is not normalised.
'''
XY_muller_matrix=self.get_muller_matrix_XY(jones_matrix)
T=0.5*np.array([[1,0,0,1],[1,0,0,-1],[0,1,1,0],\
[0,-1j,1j,0]],dtype=complex) ### see eq 8 of Hamaker et al. 1996
S=np.array([[1,1,0,0],[0,0,1,1j],[0,0,1,-1j],\
[1,-1,0,0]],dtype=complex) ### see eq 9 of Hamaker et al. 1996
stokes_muller_matrix=np.matmul(np.matmul(T,XY_muller_matrix),S)
return stokes_muller_matrix