Source code for orca.utils.beam

"""Primary beam models for OVRO-LWA observations.

Provides abstract base class and implementations for computing beam
response as a function of position. Includes analytic approximations
and simulation-derived beam models (Woody beam).
"""
from abc import ABC, abstractmethod
from typing import Tuple

import numpy as np
import matplotlib.pyplot as plt
import sys,os,inspect,glob
from casacore import tables
from scipy.interpolate import griddata as gd

from orca.configmanager import telescope as tele

if tele.n_ant > 256:
[docs] BEAM_FILE_PATH = os.path.abspath('/opt/beam')
else: BEAM_FILE_PATH = os.path.abspath('/lustre/mmanders/LWA/modules/beam')
[docs] class BaseBeam(ABC): """Abstract base class for beam models. Defines the interface for computing beam response at arbitrary sky positions. """ @abstractmethod def __init__(self, msfile: str): """Initialize beam model from measurement set. Args: msfile: Path to measurement set for frequency information. """ pass @abstractmethod
[docs] def srcIQUV(self, az: float, el: float) -> Tuple[float, float, float, float]: """Get beam response in Stokes I, Q, U, V at given position. Args: az: Azimuth in degrees. el: Elevation in degrees. Returns: Tuple of (I, Q, U, V) beam response values. """ pass
[docs] class AnalyticBeam(BaseBeam): """Simple analytic beam model using sin(el)^1.6 approximation.""" def __init__(self, msfile: str): """Initialize analytic beam (no MS data needed).""" pass
[docs] def srcIQUV(self, az: float, el: float) -> Tuple[float, float, float, float]: """Compute analytic beam response. Args: az: Azimuth in degrees (unused). el: Elevation in degrees. Returns: Tuple of (I, 0, 0, 0) with I = sin(el)^1.6. """ return np.sin(el * 2 * np.pi/360) ** 1.6, 0, 0, 0
[docs] class WoodyBeam(BaseBeam): """Simulation-derived LWA dipole beam model. Uses pre-computed beam grids from DW beam simulations interpolated to the observation frequency. Supports full Stokes (I, Q, U, V). """ def __init__(self, msfile: str): """Initialize Woody beam from measurement set frequency. Args: msfile: Path to measurement set for extracting center frequency. """
[docs] self.CRFREQ = float(tables.table(msfile+'/SPECTRAL_WINDOW', ack=False).getcell('REF_FREQUENCY', 0))/1.e6 # starting frequency in MHz
[docs] self.path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # absolute path to module
# load 4096x4096 grid of azimuth,elevation values
[docs] self.azelgrid = np.load(BEAM_FILE_PATH+'/azelgrid.npy')
[docs] self.gridsize = self.azelgrid.shape[-1]
# load 4096x4096 grid of IQUV values, for given msfile CRFREQ
[docs] self.beamIQUVfile = BEAM_FILE_PATH+'/beamIQUV_'+str(self.CRFREQ)+'.npz'
if not os.path.exists(self.beamIQUVfile): print(f'Beam .npz file does not exist at {self.CRFREQ}. Using closest existing frequency beam file.') beamfiles = np.sort(glob.glob(f'{BEAM_FILE_PATH}/beamIQUV_*.npz')) freqs_beamfiles = np.char.strip(np.char.strip(beamfiles, chars='/opt/beam/beamIQUV_'), chars='.npz').astype(float) self.beamIQUVfile = beamfiles[np.argmin(np.abs(freqs_beamfiles - self.CRFREQ))]
[docs] self.beamIQUV = np.load(self.beamIQUVfile)
[docs] self.Ibeam = self.beamIQUV['I']
[docs] self.Qbeam = self.beamIQUV['Q']
[docs] self.Ubeam = self.beamIQUV['U']
[docs] self.Vbeam = self.beamIQUV['V']
[docs] def srcIQUV(self,az,el): """Compute beam scaling factor Args: az: azimuth in degrees el: elevation in degrees Returns: [I,Q,U,V] flux factors, where for an unpolarized source [I,Q,U,V] = [1,0,0,0] """ def knn_search(arr,grid): ''' Find 'nearest neighbor' of array of points in multi-dimensional grid Source: glowingpython.blogspot.com/2012/04/k-nearest-neighbor-search.html ''' gridsize = grid.shape[1] dists = np.sqrt(((grid - arr[:,:gridsize])**2.).sum(axis=0)) return np.argsort(dists)[0] # index where grid equals source az el values index = knn_search(np.array([ [az], [el] ]), self.azelgrid.reshape(2,self.gridsize*self.gridsize)) Ifctr = self.Ibeam.reshape(self.gridsize*self.gridsize)[index] Qfctr = self.Qbeam.reshape(self.gridsize*self.gridsize)[index] Ufctr = self.Ubeam.reshape(self.gridsize*self.gridsize)[index] Vfctr = self.Vbeam.reshape(self.gridsize*self.gridsize)[index] return Ifctr,Qfctr,Ufctr,Vfctr
[docs] def plotbeam(self): ''' Show the IQUV beams at MS center frequency, and azimuth,elevation grids. ''' import pylab as p # azel grids p.figure(figsize=(6,9)) p.subplot(211) p.imshow(np.rot90(self.azelgrid[0,:,:]), cmap=plt.get_cmap('inferno')) p.title('azimuth') p.colorbar() p.subplot(212) p.imshow(np.rot90(self.azelgrid[1,:,:]), cmap=plt.get_cmap('inferno')) p.title('elevation') p.colorbar() # IQUV beams p.figure(figsize=(15,9)) p.subplot(221) p.imshow(np.rot90(self.Ibeam),vmin=0.0,vmax=1.0,cmap=plt.get_cmap('inferno')) p.title('I') p.colorbar() p.subplot(222) p.imshow(np.rot90(self.Qbeam),vmin=-0.15,vmax=0.15,cmap=plt.get_cmap('inferno')) p.title('Q') p.colorbar() p.subplot(223) p.imshow(np.rot90(self.Ubeam),vmin=-0.15,vmax=0.15,cmap=plt.get_cmap('inferno')) p.title('U') p.colorbar() p.subplot(224) p.imshow(np.rot90(self.Vbeam),vmin=-0.010,vmax=0.010,cmap=plt.get_cmap('inferno')) p.title('V') p.colorbar() p.show()
[docs] class jones: """ For loading and returning LWA dipole beam values (derived from DW beam simulations) on the ASTM. Last edit: 11 September 2020 """ def __init__(self,msfile):
[docs] self.CRFREQ = float(tables.table(msfile+'/SPECTRAL_WINDOW', ack=False).getcell('NAME', 0))/1.e6 # center frequency in MHz
[docs] self.path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # absolute path to module
# load 4096x4096 grid of azimuth,elevation values #self.lmgrid = np.load(BEAM_FILE_PATH+'/lmgrid.npy') #self.gridsize = self.lmgrid.shape[-1] ## load 4096x4096 grid of complex Co, Cx values #self.beamjonesfile = BEAM_FILE_PATH+'/beamJones.npz'
[docs] self.beamjonesfile = BEAM_FILE_PATH+'/beamLudwig3rd.npz'
if os.path.exists(self.beamjonesfile): #self.beamjones = np.load(self.beamjonesfile) #self.Co = self.beamjones['Co'] #self.Cx = self.beamjones['Cx'] self.beamjones = np.load(self.beamjonesfile) self.Co = self.beamjones['cofull'] self.Cx = self.beamjones['cxfull'] self.Corot90 = self.beamjones['cofull_rot90'] self.Cxnrot90 = self.beamjones['cxfull_nrot90'] self.l = self.beamjones['lfull'] self.m = self.beamjones['mfull'] self.freqs = self.beamjones['freqfull'] else: print >> sys.stderr, 'Beam .npz file does not exist at %s. Check your frequency. \ May need to generate new beam file.' % str(self.CRFREQ) sys.exit()
[docs] def srcjones(self,l,m): """Compute beam scaling factor Args: (l,m) coordinates Returns: Jones matrix at coordinates (l,m) """ #def knn_search(arr,grid): # ''' # Find 'nearest neighbor' of array of points in multi-dimensional grid # Source: glowingpython.blogspot.com/2012/04/k-nearest-neighbor-search.html # ''' # gridsize = grid.shape[1] # dists = np.sqrt(((grid - arr[:,:gridsize])**2.).sum(axis=0)) # return np.argsort(dists)[0] ## index where grid equals source az el values #index = knn_search(np.array([ [l], [m] ]), self.lmgrid.reshape(2,self.gridsize*self.gridsize)) #Jonesmat = np.array([ [self.Co.reshape(self.gridsize*self.gridsize)[index], self.Cx.reshape(self.gridsize*self.gridsize)[index]], # [-np.rot90(self.Cx).reshape(self.gridsize*self.gridsize)[index], np.rot90(self.Co).reshape(self.gridsize*self.gridsize)[index]] ]) coval = gd( (self.l.ravel(), self.m.ravel(), self.freqs.ravel()), \ selfs.Co.ravel(), (l, m, self.CRFREQ), method='linear') cxval = gd( (self.l.ravel(), self.m.ravel(), self.freqs.ravel()), \ selfs.Cx.ravel(), (l, m, self.CRFREQ), method='linear') corot90val = gd( (self.l.ravel(), self.m.ravel(), self.freqs.ravel()), \ self.Corot90.ravel(), (l, m, self.CRFREQ), method='linear') cxnrot90val = gd( (self.l.ravel(), self.m.ravel(), self.freqs.ravel()), \ self.Cxnrot90.ravel(), (l, m, self.CRFREQ), method='linear') Jonesmat = np.array([ [coval, cxval ], [cxnrot90val, corot90val] ]) return Jonesmat