# orca/tasks/peel_experiment.py
"""Experimental peeling pipeline tasks.
Provides Celery tasks for testing and developing peeling workflows
including:
- TTCal peeling with configurable source lists
- RFI source peeling
- Multiple peeling iterations with different maxiter settings
- Diagnostic image generation
This is an experimental module for peeling algorithm development.
"""
from __future__ import absolute_import, unicode_literals
import os
import shutil
import subprocess
from pathlib import Path
from datetime import datetime
import glob
import time
import numpy as np
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
from astropy.io import fits
from astropy.wcs import WCS
from astropy import units as u
from astropy.time import Time
from astropy.coordinates import SkyCoord, EarthLocation, AltAz
from casatasks import applycal
from orca.wrapper.wsclean import wsclean
from orca.celery import app # Celery app
# --------- PATH CONFIG ---------
[docs]
WORKDIR = Path("/lustre/nkosogor/peel_test")
[docs]
BASE_73 = Path("/lustre/pipeline/night-time/averaged/73MHz")
[docs]
SOURCES_JSON = WORKDIR / "sources.json"
[docs]
RFI_JSON = WORKDIR / "rfi_43.2_ver20251101.json"
[docs]
JULIA060_ENV_NAME = "julia060"
[docs]
TTCAL_DEV_ENV_PREFIX = "/opt/devel/pipeline/envs/ttcal_dev"
[docs]
CAL_TABLE = {
"2024-12-18": WORKDIR / "calibration_2024-12-18_01h.B.flagged",
"2025-05-06": WORKDIR / "calibration_2025-05-06_18h.B.flagged",
}
[docs]
DEFAULT_MAXITER = 5 # used if caller doesn't specify
# ---- geometry / plotting constants ----
[docs]
OVRO = EarthLocation(
lon=-118.282 * u.deg,
lat=37.243 * u.deg,
height=1200 * u.m
)
[docs]
CYG_A = SkyCoord("19h59m28s", "40d44m02s", frame="icrs")
[docs]
CAS_A = SkyCoord("23h23m24s", "58d48m54s", frame="icrs")
VMIN, VMAX = -2, 12
# ================== LOW-LEVEL HELPERS ==================
[docs]
def run_ttcal_with_conda(env_type: str, ms_name: str, json_path: Path, maxiter: int):
"""
env_type: 'sources' or 'rfi'
"""
json_path = json_path.resolve()
ms_name = str(ms_name)
base_cmd = [
"conda", "run",
]
if env_type == "sources":
base_cmd += ["-n", JULIA060_ENV_NAME]
elif env_type == "rfi":
base_cmd += ["-p", TTCAL_DEV_ENV_PREFIX]
else:
raise ValueError(f"Unknown env_type {env_type}")
cmd = base_cmd + [
"ttcal.jl", "zest", ms_name, str(json_path),
"--beam", "constant",
"--minuvw", "10",
"--maxiter", str(maxiter),
"--tolerance", "1e-4",
]
subprocess.run(cmd, check=True)
[docs]
def run_wsclean_stage(ms_name: str, suffix: str):
ms_dir = ms_name
workdir = Path.cwd()
extra_wsclean = [
"-pol", "IV", "-size", "4096", "4096",
"-scale", "0.03125",
"-niter", "0",
"-weight", "briggs", "0", "-horizon-mask", "10deg",
"-taper-inner-tukey", "30",
]
prefix_base = os.path.splitext(os.path.basename(ms_dir))[0]
filename_prefix = f"{prefix_base}_0_iterations_{suffix}"
wsclean(
ms_list=[ms_dir],
out_dir=str(workdir),
filename_prefix=filename_prefix,
extra_arg_list=extra_wsclean,
num_threads=4,
mem_gb=50,
)
[docs]
def run_applycal_pre_peel(ms_name: str, caltable_path: Path):
applycal(
vis=ms_name,
gaintable=[str(caltable_path)],
spw="",
spwmap=[[13]],
interp="linear,nearestflag",
calwt=False,
parang=False,
)
# --------- time / sky helpers ----------
[docs]
def parse_utc_from_tag(tag: str) -> Time:
"""
tag example: '2025-05-06_05_20250506_050008_73MHz_averaged_maxiter05'
or without the _maxiterXX suffix.
"""
core = tag
if "_maxiter" in core:
core = core.split("_maxiter")[0]
parts = core.split("_")
if len(parts) < 4:
raise ValueError(f"Unexpected tag: {tag}")
date_code = parts[2] # '20250506'
time_code = parts[3] # '050008'
dt = datetime.strptime(date_code + time_code, "%Y%m%d%H%M%S")
return Time(dt, scale="utc", location=OVRO)
[docs]
def compute_lst_and_alts(t: Time):
lst = t.sidereal_time("apparent")
altaz_frame = AltAz(obstime=t, location=OVRO)
cyg_alt = CYG_A.transform_to(altaz_frame).alt.deg
cas_alt = CAS_A.transform_to(altaz_frame).alt.deg
return lst, cyg_alt, cas_alt
[docs]
def lst_to_hms_str(lst_angle):
hours = lst_angle.hour
h = int(hours)
m = int(round((hours - h) * 60))
if m == 60:
h += 1
m = 0
h %= 24
return f"{h:02d}:{m:02d}"
# -------- image / RMS helpers ---------
def _mask_image(data, wcs, r_mask=R_MASK):
ny, nx = data.shape
yy, xx = np.indices((ny, nx))
world = wcs.pixel_to_world(xx, yy)
invalid_wcs = (~np.isfinite(world.ra.deg) |
~np.isfinite(world.dec.deg))
cy, cx = ny // 2, nx // 2
outside_circle = (xx - cx)**2 + (yy - cy)**2 > r_mask**2
mask = invalid_wcs | outside_circle
return mask, (cx, cy)
[docs]
def load_masked_image(fits_path, r_mask=R_MASK):
with fits.open(fits_path) as hdul:
data = hdul[0].data[0, 0, :, :].astype(float)
header = hdul[0].header
wcs = WCS(header).celestial
mask, center = _mask_image(data, wcs, r_mask=r_mask)
data_masked = data.copy()
data_masked[mask] = np.nan
return data_masked, wcs, center
[docs]
def robust_rms(vals, nsig=5.0, max_iter=5):
vals = np.asarray(vals)
vals = vals[np.isfinite(vals)]
if vals.size == 0:
return np.nan
median = np.median(vals)
mad = np.median(np.abs(vals - median))
if mad == 0:
return np.std(vals)
sigma = 1.4826 * mad
for _ in range(max_iter):
diff = vals - median
keep = np.abs(diff) <= nsig * sigma
new_vals = vals[keep]
if new_vals.size == 0:
break
new_median = np.median(new_vals)
new_mad = np.median(np.abs(new_vals - new_median))
if new_mad == 0:
vals = new_vals
median = new_median
break
new_sigma = 1.4826 * new_mad
if np.isclose(new_sigma, sigma, rtol=1e-3):
vals = new_vals
median = new_median
sigma = new_sigma
break
vals = new_vals
median = new_median
sigma = new_sigma
return np.std(vals)
[docs]
def compute_rms_from_fits(fits_path):
data_masked, _, _ = load_masked_image(fits_path)
good = np.isfinite(data_masked)
return robust_rms(data_masked[good])
[docs]
def make_three_panel_png(run_dir: Path, tag: str,
rms_pre: float,
rms_after1: float,
rms_after2: float):
pre_fits = glob.glob(str(run_dir / "*_0_iterations_pre_peel-I-image.fits"))
after1_fits = glob.glob(str(run_dir / "*_0_iterations_after_peel-I-image.fits"))
after2_fits = glob.glob(str(run_dir / "*_0_iterations_after_2nd_peel-I-image.fits"))
if not (pre_fits and after1_fits and after2_fits):
return
pre_fits = pre_fits[0]
after1_fits = after1_fits[0]
after2_fits = after2_fits[0]
data_pre, wcs_pre, center = load_masked_image(pre_fits)
data_after1, wcs_after1, _ = load_masked_image(after1_fits)
data_after2, wcs_after2, _ = load_masked_image(after2_fits)
cx, cy = center
cmap = plt.get_cmap("inferno").copy()
cmap.set_bad("white")
fig = plt.figure(figsize=(15, 5))
wcss = [wcs_pre, wcs_after1, wcs_after2]
datas = [data_pre, data_after1, data_after2]
titles = [
f"Pre-peel (RMS={rms_pre:.2f})",
f"After 1st peel (RMS={rms_after1:.2f})",
f"After 2nd peel (RMS={rms_after2:.2f})",
]
im = None
for i, (img, wcs, panel_title) in enumerate(zip(datas, wcss, titles), start=1):
ax = fig.add_subplot(1, 3, i, projection=wcs)
ax.set_xlabel("")
ax.set_ylabel("")
ax.coords[0].set_ticks_visible(False)
ax.coords[1].set_ticks_visible(False)
ax.coords[0].set_ticklabel_visible(False)
ax.coords[1].set_ticklabel_visible(False)
overlay = ax.get_coords_overlay("fk5")
overlay.grid(color="white", ls="dotted", lw=0.8, alpha=0.8)
overlay[0].set_ticks(spacing=20*u.deg)
overlay[1].set_ticks(spacing=20*u.deg)
im = ax.imshow(img, origin="lower", cmap=cmap,
vmin=VMIN, vmax=VMAX)
border = Circle((cx, cy), R_MASK,
transform=ax.get_transform("pixel"),
fill=False, edgecolor="black",
linewidth=1.0, alpha=1.0)
ax.add_patch(border)
ax.set_title(panel_title, fontsize=12)
plt.subplots_adjust(wspace=0.02, right=0.9)
cax = fig.add_axes([0.92, 0.15, 0.02, 0.7])
cbar = plt.colorbar(im, cax=cax, orientation="vertical")
cbar.set_label(r"Intensity (Jy/beam)", fontsize=12)
t = parse_utc_from_tag(tag)
lst, cyg_alt, cas_alt = compute_lst_and_alts(t)
lst_str = lst_to_hms_str(lst)
utc_str = t.utc.datetime.strftime("%Y-%m-%d %H:%M:%S")
fig.suptitle(
f"{tag} {utc_str} UTC (LST {lst_str}) "
f"Cyg A {cyg_alt:.1f}°, Cas A {cas_alt:.1f}°",
fontsize=13
)
out_png = run_dir / f"{tag}_3panel_peeling.png"
fig.savefig(out_png, dpi=300, bbox_inches="tight", facecolor="white")
plt.close(fig)
[docs]
def make_diff_image(run_dir: Path, tag: str):
pre_fits = glob.glob(str(run_dir / "*_0_iterations_pre_peel-I-image.fits"))
after2_fits = glob.glob(str(run_dir / "*_0_iterations_after_2nd_peel-I-image.fits"))
if not (pre_fits and after2_fits):
return None, None
pre_fits = pre_fits[0]
after2_fits = after2_fits[0]
with fits.open(pre_fits) as h_pre, fits.open(after2_fits) as h_after:
data_pre = h_pre[0].data.astype(np.float32)
data_after2 = h_after[0].data.astype(np.float32)
diff_data = data_pre - data_after2
h_diff = fits.HDUList([fits.PrimaryHDU(data=diff_data, header=h_pre[0].header)])
diff_fits = run_dir / f"{tag}_0_iterations_diff_pre_minus_after2-I-image.fits"
h_diff.writeto(diff_fits, overwrite=True)
plt.figure(figsize=(5, 4))
plt.imshow(diff_data[0, 0, :, :], origin="lower", cmap="coolwarm")
plt.colorbar(label="Pre - After2 (Jy/beam)")
plt.title(f"Diff image: {tag}")
diff_png = run_dir / f"{tag}_diff_pre_minus_after2.png"
plt.savefig(diff_png, dpi=200, bbox_inches="tight", facecolor="white")
plt.close()
return diff_fits, diff_png
# ================== THE CELERY TASK ==================
@app.task(name="orca.tasks.peel_experiment.peel_experiment_task")
[docs]
def peel_experiment_task(ms_rel_path: str, maxiter: int = DEFAULT_MAXITER):
"""
Big experiment task:
- copy MS to WORKDIR/tag_maxiterXX
- applycal
- image pre-peel -> RMS_pre
- ttcal (sources, maxiter=...) + image -> RMS_after1
- ttcal (RFI, maxiter=...) + image -> RMS_after2
- make 3-panel PNG
- make pre-minus-after2 difference image
ms_rel_path is relative to BASE_73, e.g.
'2025-05-06/05/20250506_050008_73MHz_averaged.ms'
"""
eff_maxiter = int(maxiter) if maxiter is not None else DEFAULT_MAXITER
base_ms_path = BASE_73 / ms_rel_path
if not base_ms_path.is_dir():
raise FileNotFoundError(f"Base MS not found: {base_ms_path}")
parts = Path(ms_rel_path).parts # [date, hour, msname]
date_str = parts[0]
hour_str = parts[1]
msname = parts[2]
caltab = CAL_TABLE.get(date_str)
if caltab is None or not caltab.exists():
raise FileNotFoundError(f"No calibration table for {date_str}: {caltab}")
tag_core = f"{date_str}_{hour_str}_{msname[:-3]}"
tag = f"{tag_core}_maxiter{eff_maxiter:02d}"
run_dir = WORKDIR / tag
run_dir.mkdir(parents=True, exist_ok=True)
ms_basename = msname
dest_ms = run_dir / ms_basename
if not dest_ms.exists():
shutil.copytree(base_ms_path, dest_ms)
t0 = time.time()
os.chdir(run_dir)
# 1) applycal + pre-peel image
run_applycal_pre_peel(ms_basename, caltab)
run_wsclean_stage(ms_basename, "pre_peel")
pre_fits = glob.glob("*_0_iterations_pre_peel-I-image.fits")[0]
rms_pre = compute_rms_from_fits(pre_fits)
# 2) first peel (sources) + image
run_ttcal_with_conda("sources", ms_basename, SOURCES_JSON, eff_maxiter)
run_wsclean_stage(ms_basename, "after_peel")
after1_fits = glob.glob("*_0_iterations_after_peel-I-image.fits")[0]
rms_after1 = compute_rms_from_fits(after1_fits)
# 3) second peel (RFI) + image
run_ttcal_with_conda("rfi", ms_basename, RFI_JSON, eff_maxiter)
run_wsclean_stage(ms_basename, "after_2nd_peel")
after2_fits = glob.glob("*_0_iterations_after_2nd_peel-I-image.fits")[0]
rms_after2 = compute_rms_from_fits(after2_fits)
# 4) 3-panel PNG
make_three_panel_png(run_dir, tag, rms_pre, rms_after1, rms_after2)
# 5) difference image
diff_fits, diff_png = make_diff_image(run_dir, tag)
elapsed = time.time() - t0
os.chdir(WORKDIR)
return {
"tag": tag,
"ms_rel_path": ms_rel_path,
"run_dir": str(run_dir),
"maxiter": eff_maxiter,
"rms_pre": float(rms_pre),
"rms_after1": float(rms_after1),
"rms_after2": float(rms_after2),
"diff_fits": str(diff_fits) if diff_fits else None,
"diff_png": str(diff_png) if diff_png else None,
"elapsed_sec": elapsed,
}