"""FITS file I/O and image manipulation utilities.
Provides functions for reading and writing FITS images, creating masks,
co-adding images, and extracting cutouts and statistics around source
positions.
"""
import warnings
from astropy.io import fits
from astropy import wcs
from astropy.coordinates import SkyCoord
import numpy as np
from typing import Tuple, List, Optional
[docs]
def read_image_fits(fn: str) -> Tuple[np.ndarray, fits.Header]:
"""Read a 2D image from a FITS file.
Extracts the first plane from a 4D FITS cube (typical WSClean output).
Args:
fn: Path to the FITS file.
Returns:
Tuple of (image_data, header).
"""
with fits.open(fn) as hdulist:
image = hdulist[0].data[0, 0]
header = hdulist[0].header
return image, header
[docs]
def write_image_fits(fn: str, header: fits.Header, data: np.ndarray, overwrite: bool = False):
"""Write a 2D image to a FITS file.
Reshapes 2D data into 4D format for CASA/WSClean compatibility.
Args:
fn: Output file path.
header: FITS header to use.
data: 2D image data array.
overwrite: If True, overwrite existing file.
"""
fits.PrimaryHDU(np.reshape(data, newshape=(1, 1, *data.shape)), header=header).writeto(
fn, overwrite=overwrite)
[docs]
def write_fits_mask_with_box_xy_coordindates(output_fits_path: str, imsize: int,
center_list: List[Tuple[int, int]], width_list: List[int]) -> str:
"""Writes a fits mask with a list of boxes to file.
Writes a fits mask with a list of boxes to file. Both im+list and center_list are in
WCS X Y coordindates (i.e. transpose of the numpy array
from astropy.fits. The fits will NOT have a header that is accurate since only the pixels will be used.
If you only need one box you can use one-element lists for the arguments.
Args:
output_fits_path: The fits file path.
imsize: size of the image.
center_list: A list of enters of the boxes. Must be in XY index (i.e. what ds9 shows).
width_list: A list of widths of the boxes in pixels, with each element being the width of the corresponding
center_list element.
Returns: The fits mask path.
"""
image = np.zeros(shape=(imsize, imsize), dtype='>f4')
for i, center in enumerate(center_list):
width = width_list[i]
assert width % 2 == 1, 'width must be an odd number'
image[(center[0] - width//2):(center[0] + width//2 + 1), (center[1] - width//2):(center[1] + width//2 + 1)] = \
np.ones(shape=(width, width))
write_image_fits(output_fits_path, get_sample_header(), image.T, overwrite=True)
return output_fits_path
[docs]
def co_add(fits_list: List[str], output_fits_path: str, header_index: Optional[int] = None) -> str:
im, header = read_image_fits(fits_list[header_index if header_index else len(fits_list) // 2])
averaged_im = co_add_arr(fits_list, im.shape)
write_image_fits(output_fits_path, header, averaged_im)
return output_fits_path
[docs]
def co_add_arr(fits_list, dims):
averaged_im = np.zeros(shape=dims)
n = len(fits_list)
for fn in fits_list:
im, _ = read_image_fits(fn)
averaged_im += (im / n)
return averaged_im
[docs]
def get_peak_around_src(im_T: np.ndarray, source_coord: SkyCoord, w: wcs.WCS, radius_px:int = 100) -> Tuple[int, int]:
x, y = wcs.utils.skycoord_to_pixel(source_coord, w)
x_start = int(x) - radius_px
y_start = int(y) - radius_px
im_box = im_T[x_start:x_start + 2 * radius_px, y_start:y_start + 2 * radius_px]
peakx, peaky = np.unravel_index(np.argmax(im_box),
im_box.shape)
peakx += x_start
peaky += y_start
return peakx, peaky
[docs]
def std_and_max_around_src(im_T: np.ndarray, radius:int, source_coord: SkyCoord, w: wcs.WCS) -> Tuple[float, float]:
x, y = wcs.utils.skycoord_to_pixel(source_coord, w)
if np.isnan(x) or np.isnan(y):
return np.nan, np.nan
x = int(x)
y = int(y)
im_box = im_T[x - radius :x + radius, y - radius : y + radius]
return np.std(im_box), np.max(im_box)
[docs]
def get_cutout(im: np.ndarray, coord: SkyCoord, w: wcs.WCS, radius_px: int) -> np.ndarray:
x, y = wcs.utils.skycoord_to_pixel(coord, w)
if np.isnan(x) or np.isnan(y):
raise ValueError(f'Coordinate {coord} is not in the image.')
x = int(x)
y = int(y)
return im.T[x - radius_px : x + radius_px, y - radius_px : y + radius_px].T