"""Peeled imaging pipeline Celery task.
Implements a complete imaging pipeline with peeling:
copy → applycal → flag_ants → AOFlagger → TTCal peel → WSClean
Results are generated on NVMe scratch and moved to Lustre on completion.
This module extends the base imaging_tasks without modifying them.
"""
from __future__ import annotations
import os, shutil, uuid, glob, logging
from typing import List, Optional, Dict
from pathlib import Path
from casatasks import applycal
from orca.celery import app # same Celery instance
from orca.transform.flagging import (
flag_ants,
flag_with_aoflagger,
)
from orca.utils.paths import get_aoflagger_strategy
from orca.wrapper.ttcal import peel_with_ttcal
from orca.wrapper.wsclean import wsclean
from orca.tasks.imaging_tasks import ( # reuse helpers
_nvme_workspace, _plot_dirty,
)
import subprocess
[docs]
def peel_with_ttcal_maxiter5(ms: str, sources: str):
"""Run TTCal peeling with reduced iteration count.
Local wrapper forcing --maxiter=5 for faster peeling.
Args:
ms: Path to measurement set.
sources: Path to sources.json file.
Raises:
RuntimeError: If TTCal returns non-zero exit code.
"""
env = dict(os.environ, LD_LIBRARY_PATH='/opt/astro/mwe/usr/lib64:/opt/astro/lib/',
AIPSPATH='/opt/astro/casa-data dummy dummy')
julia = '/opt/devel/pipeline/envs/julia060/bin/julia'
ttcal = '/opt/devel/pipeline/envs/julia060/bin/ttcal.jl'
cmd = [julia, ttcal, 'peel', ms, sources,
'--beam', 'sine', '--maxiter', '5',
'--tolerance', '1e-4', '--minuvw', '10']
proc = subprocess.run(cmd, env=env, capture_output=True, text=True)
if proc.returncode != 0:
LOG.error(f'TTCal failed with stderr: {proc.stderr}')
LOG.info(f'stdout: {proc.stdout}')
raise RuntimeError("TTCal failed.")
LOG.info(f"TTCal peel stdout: {proc.stdout}")
[docs]
LOG = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
@app.task(
bind=True,
name="orca.tasks.imaging_peel_pipeline.peel_imaging_pipeline_task",
queue="imaging",
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=3,
)
[docs]
def peel_imaging_pipeline_task(
self,
ms_path: str,
delay_table: str,
bandpass_table: str,
final_dir: str,
bad_corrs: Optional[List[int]] = None,
aoflag_strategy: str | None = "LWA_opt_GH1.lua",
peel_sources_json: str | None = "/home/pipeline/sources.json",
extra_wsclean: Optional[List[str]] = None,
) -> Dict[str, str]:
"""
Full-featured imaging pipeline **added** on top of the old tasks.
Returns {'dirty_png': <file>, 'workspace': <directory>}
"""
# 1. ---------------------------------------------------------------- copy
workdir, nvme_ms = _nvme_workspace(ms_path)
shutil.copytree(ms_path, nvme_ms)
LOG.info("[%s] copied → %s", self.request.id, nvme_ms)
# 2. ----------------------------------------------------------- applycal
applycal(
vis = nvme_ms,
gaintable = [delay_table, bandpass_table],
calwt = [False],
flagbackup=True,
)
# 3. -------------------------------------------------------- flag_ants
if bad_corrs:
LOG.info("[%s] flag_ants %s", self.request.id, bad_corrs)
flag_ants(nvme_ms, bad_corrs)
# 4. ----------------------------------------------------- AOFlagger RFI
if aoflag_strategy:
lua_file = (
get_aoflagger_strategy(aoflag_strategy)
if "/" not in aoflag_strategy else aoflag_strategy
)
flag_with_aoflagger(nvme_ms, strategy=lua_file)
LOG.info("[%s] AOFlagger %s done", self.request.id, lua_file)
# 5. ---------------------------------------------------------- TTCal peel
#if peel_sources_json:
# peel_with_ttcal(nvme_ms, peel_sources_json)
# LOG.info("[%s] TTCal peel done", self.request.id)
if peel_sources_json:
peel_with_ttcal_maxiter5(nvme_ms, peel_sources_json)
LOG.info("[%s] TTCal peel done with maxiter=5", self.request.id)
# 6. ------------------------------------------------------------ WSClean
if extra_wsclean is None:
extra_wsclean = [
"-pol", "I", "-size", "4096", "4096",
"-scale", "0.03125", "-niter", "5",
"-weight", "briggs", "0", "-horizon-mask", "10deg",
"-taper-inner-tukey", "30",
]
#prefix = os.path.join(workdir, os.path.splitext(os.path.basename(nvme_ms))[0])
#dirty_fits = f"{prefix}-dirty.fits"
#png_out = f"{prefix}-dirty.png"
prefix = os.path.join(workdir, os.path.splitext(os.path.basename(nvme_ms))[0])
wsclean(
ms_list=[nvme_ms], out_dir=workdir,
filename_prefix=os.path.basename(prefix),
extra_arg_list=extra_wsclean,
num_threads=4, mem_gb=50,
)
#_plot_dirty(dirty_fits, png_out)
dirty_fits_files = glob.glob(os.path.join(workdir, "*-dirty.fits"))
png_files = []
for dfits in dirty_fits_files:
png_file = dfits.replace(".fits", ".png")
_plot_dirty(dfits, png_file)
png_files.append(png_file)
# build your desired dir name (without UUID)
final_subdir = os.path.join(final_dir, os.path.splitext(os.path.basename(nvme_ms))[0])
# create it
os.makedirs(final_subdir, exist_ok=True)
# move all individual dirty fits & png into it
for dfits in dirty_fits_files:
shutil.move(dfits, os.path.join(final_subdir, os.path.basename(dfits)))
for png in png_files:
shutil.move(png, os.path.join(final_subdir, os.path.basename(png)))
# now move *contents* of workdir (not the directory itself) into final_subdir
for item in os.listdir(workdir):
src = os.path.join(workdir, item)
dst = os.path.join(final_subdir, item)
shutil.move(src, dst)
# remove the empty workdir
shutil.rmtree(workdir, ignore_errors=True)
return {
"dirty_pngs": [os.path.join(final_subdir, os.path.basename(png)) for png in png_files],
"workspace": final_subdir,
}
'''os.makedirs(final_dir, exist_ok=True)
for dfits in dirty_fits_files:
shutil.move(dfits, os.path.join(final_dir, os.path.basename(dfits)))
for png in png_files:
shutil.move(png, os.path.join(final_dir, os.path.basename(png)))
shutil.move(workdir, os.path.join(final_dir, os.path.basename(workdir)))
return {
"dirty_pngs": [os.path.join(final_dir, os.path.basename(png)) for png in png_files],
"workspace": os.path.join(final_dir, os.path.basename(workdir)),
}
'''