"""
Functions used to reduce the raw data.
"""
import os
from collections.abc import Callable
from os import PathLike
from pathlib import Path
import astropy
import numpy as np
__all__ = [
"stsci_det1",
"run_det1",
"gen_associations",
"process_using_aws",
"recursive_merge",
"load_assoc",
"grism_background_subtraction",
]
[docs]
def stsci_det1(
uncal_dir: PathLike,
raw_output_dir: PathLike,
cpu_count: int = 1,
joblib_backend: str = "loky",
**kwargs,
):
"""
Run the JWST level 1 detector pipeline.
Parameters
----------
uncal_dir : PathLike
The location of the ``_uncal.fits`` files.
raw_output_dir : PathLike
Where the output files will be saved.
cpu_count : int, optional
The number of CPU cores to use for running the pipeline. Each core
will process a single file, so note that the memory usage will
increase substantially when running in parallel. By default only
one core will be used.
joblib_backend : str, optional
The backend to use when running the pipeline in parallel, by
default ``"loky"``.
**kwargs : dict, optional
Any additional keyword arguments, to be passed to
`jwst.pipeline.Detector1Pipeline`.
"""
if cpu_count <= 0:
cpu_count = os.cpu_count()
files_to_process = []
for i, file in enumerate(uncal_dir.glob("*uncal.fits")):
output_filename = (raw_output_dir / file.name).with_stem(
file.stem.replace("_uncal", "_rate")
)
if output_filename.is_file():
# print(f"{output_filename} exists.")
continue
else:
files_to_process.append(file)
if len(files_to_process) > 0:
run_det1(files_to_process[0], raw_output_dir, **kwargs)
if len(files_to_process) > 1:
from joblib import Parallel, delayed
Parallel(
n_jobs=cpu_count,
backend=joblib_backend,
verbose=50,
)(
delayed(run_det1)(
uncal_path=file,
output_dir=raw_output_dir,
**kwargs,
)
for file in files_to_process[1:]
)
[docs]
def run_det1(uncal_path: PathLike, output_dir: PathLike, **kwargs):
"""
Run the level 1 detector pipeline on an uncalibrated image.
Applies detector-level corrections.
Parameters
----------
uncal_path : PathLike
The location of the file to be processed, with the extension
``_uncal.fits``.
output_dir : PathLike
The directory into which the output files will be saved.
**kwargs : dict, optional
Any additional keyword arguments, to be passed to
`jwst.pipeline.Detector1Pipeline`.
"""
from jwst.pipeline import Detector1Pipeline
uncal_path = Path(uncal_path)
Detector1Pipeline.call(
str(uncal_path),
output_dir=str(output_dir),
save_results=True,
output_file=uncal_path.stem.removesuffix("_uncal"),
**kwargs,
)
[docs]
def gen_associations(raw_output_dir: PathLike, field_name: str = "glass-a2744") -> dict:
"""
Generate exposure tables for each group of filters and grisms.
This is particularly useful for programmes such as PASSAGE, for which
no existing associations are available in the Dawn JWST Archive.
Parameters
----------
raw_output_dir : PathLike
Where the current ``_rate.fits`` files are located.
field_name : str, optional
The name of the field, by default "glass-a2744".
Returns
-------
dict
The keys of the dict are the name of each group (equivalent to
``"assoc"`` used by `grizli.aws`), and the values are the
exposure tables.
"""
from astropy.io import fits
from astropy.table import Table
from astropy.wcs import WCS
from grizli import utils as grizli_utils
from shapely.geometry import Polygon
assoc_dict = {}
for filepath in raw_output_dir.glob("*_rate.fits"):
main_hdr = fits.getheader(filepath)
sci_hdr = fits.getheader(filepath, "SCI")
filt_only = grizli_utils.parse_filter_from_header(main_hdr).split("-")[0]
filt_all = grizli_utils.parse_filter_from_header(main_hdr)
# There's surely a more intelligent way to do this, but grizli is happy for now
s = np.asarray(
sci_hdr["S_REGION"].removeprefix("POLYGON ICRS ").split(" "), dtype=float
)
footprint = (
f"(({s[0]}, {s[1]}), ({s[2]}, {s[3]}), ({s[4]}, {s[5]}), ({s[6]}, {s[7]}))"
)
name = filepath.stem.removesuffix("_rate")
try:
assoc_dict[f"{field_name}_assoc_{filt_only.lower()}"].add_row(
[
name,
"rate",
name,
f"{field_name}_assoc_{filt_only.lower()}",
filt_all,
main_hdr.get("INSTRUME"),
main_hdr.get("PROGRAM"),
f"dummy/{filepath.name}",
footprint,
],
)
except:
assoc_dict[f"{field_name}_assoc_{filt_only.lower()}"] = Table(
data=[
[name],
["rate"],
[name],
[f"{field_name}_assoc_{filt_only.lower()}"],
[filt_all],
[main_hdr.get("INSTRUME")],
[main_hdr.get("PROGRAM")],
[f"dummy/{filepath.name}"],
[footprint],
],
names=[
"file",
"extension",
"dataset",
"assoc",
"filter",
"instrument_name",
"proposal_id",
"dataURL",
"footprint",
],
)
return assoc_dict
[docs]
def load_assoc(
ra: float = 3.58641,
dec: float = -30.39997,
radius: int = 1,
proposal_id: int = 1324,
instrument_name: str = "NIRISS",
):
"""
Load exposure tables and association names from the DJA.
Default parameters are those used for the GLASS-JWST ERS analysis.
Parameters
----------
ra : float, optional
The right ascension of the centre of the field of interest, by
default ``3.58641``.
dec : float, optional
The declination of the centre of the field of interest, by default
``-30.39997``.
radius : int, optional
The radius in arcminutes to query, by default ``1``.
proposal_id : int, optional
The JWST proposal ID for the observations of interest, by default
``1324``.
instrument_name : str, optional
The name of the instrument, by default ``"NIRISS"``.
Returns
-------
dict
The keys of the dict are the name of each association, and the
values are the exposure tables.
"""
import shutil
from grizli import utils
QUERY_URL = (
"https://grizli-cutout.herokuapp.com/assoc"
"?coord={ra},{dec}&arcmin={radius}&output=csv"
)
assoc_query = utils.read_catalog(
QUERY_URL.format(ra=ra, dec=dec, radius=radius), format="csv"
)
nis = (assoc_query["instrument_name"] == instrument_name) & (
assoc_query["proposal_id"] == proposal_id
)
EXPOSURE_API = "https://grizli-cutout.herokuapp.com/exposures?associations={assoc}"
assoc_dict = {}
for assoc_name in assoc_query["assoc_name"][nis]:
exp = utils.read_catalog(EXPOSURE_API.format(assoc=assoc_name), format="csv")
assoc_dict[assoc_name] = exp
# assoc_dict[assoc_name] = None
return assoc_dict
[docs]
def process_using_aws(
grizli_home_dir: PathLike,
raw_output_dir: PathLike,
assoc_tab_dict: dict,
field_name: str = "glass-a2744",
process_visit_kwargs: dict = {},
ref_wcs: astropy.wcs.WCS | None = None,
mosaic_pixel_scale: float = 0.03,
mosaic_pad: float = 6,
drizzle_kernel: str = "square",
drizzle_pixfrac: float = 0.8,
cutout_mosaic_kwargs: dict = {},
proposal_id: int = 1324,
):
"""
Process WFSS data using the functions in `grizli.aws`.
Parameters
----------
grizli_home_dir : PathLike
Directory containing the usual grizli folders, e.g. ``"Prep"``,
``"visits"``.
raw_output_dir : PathLike
Where the current ``_rate.fits`` files are located.
assoc_tab_dict : dict
The keys of the dict are the name of each group of exposures,
and the values should be an exposure table.
field_name : str, optional
The name of the field, by default ``"glass-a2744"``.
process_visit_kwargs : dict, optional
Any additional arguments to pass to
`grizli.aws.visit_processor.process_visit`, by default ``{}``.
ref_wcs : astropy.wcs.WCS | None, optional
The reference WCS to be used when generating the drizzled mosaics.
By default None, in which case it will be calculated automatically
from the overlap of the individual filter images.
mosaic_pixel_scale : float, optional
The pixel scale (in arcseconds) to be used for the drizzled
mosaics, by default 0.03.
mosaic_pad : float, optional
The padding for the drizzled mosaics in arcseconds, by default 6.
drizzle_kernel : str, optional
The kernel to use for drizzling the mosaics, by default
``"square"``.
drizzle_pixfrac : float, optional
The ``pixfrac`` used for the drizzled mosaics, by default 0.8.
cutout_mosaic_kwargs : dict, optional
Any additional arguments to pass to
`grizli.aws.visit_processor.cutout_mosaic`, by default ``{}``.
proposal_id : int, optional
The JWST proposal ID for the observations of interest, by default
``1324``.
"""
visit_dir = grizli_home_dir / "visits"
os.chdir(visit_dir)
import shutil
from grizli import utils as grizli_utils
from grizli.aws import visit_processor
visit_processor.ROOT_PATH = str(visit_dir)
for assoc_name, exp in assoc_tab_dict.items():
if not (visit_dir / assoc_name / "Prep").is_dir():
# Make all the directories
assoc_dir = visit_dir / assoc_name
(assoc_dir / "RAW").mkdir(exist_ok=True, parents=True)
(assoc_dir / "Persistence").mkdir(exist_ok=True, parents=True)
(assoc_dir / "Extractions").mkdir(exist_ok=True, parents=True)
(assoc_dir / "Prep").mkdir(exist_ok=True, parents=True)
# Only copy files if this visit hasn't been processed yet
if len([*(assoc_dir / "Prep").glob("*drz_sci.fits")]) == 0:
for filename in exp["dataset"]:
try:
shutil.copy(
raw_output_dir / f"{filename}_rate.fits", assoc_dir / "RAW"
)
except Exception as e:
print(e)
print(f"{filename} not found?")
for assoc_name, tab in assoc_tab_dict.items():
if len([*(visit_dir / assoc_name / "Prep").glob("*drz_sci.fits")]) == 0:
# By default, do not clean all files afterwards
if not process_visit_kwargs.get("clean"):
process_visit_kwargs["clean"] = False
# Ensure the correct CRDS context is used, unless otherwise specified
if not process_visit_kwargs.get("other_args"):
process_visit_kwargs["other_args"] = {}
if not process_visit_kwargs["other_args"].get("CRDS_CONTEXT"):
process_visit_kwargs["other_args"]["CRDS_CONTEXT"] = os.environ[
"CRDS_CONTEXT"
]
if not process_visit_kwargs["other_args"].get("mosaic_drizzle_args"):
process_visit_kwargs["other_args"]["mosaic_drizzle_args"] = {}
if not process_visit_kwargs["other_args"]["mosaic_drizzle_args"].get(
"context"
):
process_visit_kwargs["other_args"]["mosaic_drizzle_args"]["context"] = (
os.environ["CRDS_CONTEXT"]
)
if "instrume" in tab.colnames:
tab["instrument_name"] = tab["instrume"]
tab["proposal_id"] = proposal_id
tab["dataURL"] = [f"dummy/{file}_rate.fits" for file in tab["file"]]
_ = visit_processor.process_visit(
assoc_name,
sync=False,
with_db=False,
tab=tab,
**process_visit_kwargs,
)
else:
print(f"Directory {assoc_name} found, local preprocesing complete!")
os.chdir(grizli_home_dir / "Prep")
# Symlink preprocessed exposure files here
import subprocess
for assoc_name in assoc_tab_dict.keys():
subprocess.run(f"ln -sf ../visits/{assoc_name}/Prep/*rate.fits .", shell=True)
import numpy as np
from astropy.wcs import WCS
files = [str(s) for s in (grizli_home_dir / "Prep").glob("*rate.fits")]
files.sort()
res = visit_processor.res_query_from_local(files=files)
is_grism = np.array(["GR" in filt for filt in res["filter"]])
if not ref_wcs:
# Mosaic WCS that contains all exposures
hdu = grizli_utils.make_maximal_wcs(
files=files,
pixel_scale=mosaic_pixel_scale,
pad=mosaic_pad,
get_hdu=True,
verbose=False,
)
ref_wcs = WCS(hdu.header)
# Default set of parameters for drizzled mosaics
_mosaic_kwargs = {
"rootname": field_name,
"res": res[
~is_grism
], # Pass the exposure information table for the direct images
"ir_wcs": ref_wcs,
"half_optical": (
False
), # Otherwise will make JWST exposures at half pixel scale of ref_wcs
"kernel": drizzle_kernel,
"pixfrac": drizzle_pixfrac,
"clean_flt": (
False
), # Otherwise removes "rate.fits" files from the working directory!
"s3output": None,
"make_exptime_map": True,
"expmap_sample_factor": 1,
"expmap_keep_small": False,
"weight_type": "jwst_var",
"skip_existing": False,
"context": os.environ["CRDS_CONTEXT"],
}
# Make individual drizzled images for each of the filters
_ = visit_processor.cutout_mosaic(
**recursive_merge(_mosaic_kwargs, cutout_mosaic_kwargs)
)
from astropy.table import vstack
from grizli.pipeline import auto_script
# Create a combined visits.yaml
visits, groups, info = [], [], None
for assoc_name in assoc_tab_dict.keys():
v, g, i = auto_script.load_visits_yaml(
grizli_home_dir
/ "visits"
/ assoc_name
/ "Prep"
/ f"{assoc_name}_visits.yaml"
)
for j, v_j in enumerate(v):
v[j]["footprints"] = [fp for fps in v_j["footprints"] for fp in fps]
for j, g_j in enumerate(g):
for img_type in g_j.keys():
try:
g[j][img_type]["footprints"] = [
fp for fps in g_j[img_type]["footprints"] for fp in fps
]
except:
print(f"Problem with {g[j]}")
visits.extend(v)
groups.extend(g)
if info is None:
info = i
else:
info = vstack([info, i])
auto_script.write_visit_info(
visits, groups, info, field_name, path=str(grizli_home_dir / "Prep")
)
# Make a stacked mosaic using all three filters
auto_script.make_filter_combinations(
field_name,
filter_combinations={"ir": ["F115WN-CLEAR", "F150WN-CLEAR", "F200WN-CLEAR"]},
)
[docs]
def recursive_merge(d1: dict, d2: dict) -> dict:
"""
Recursively merge two dictionaries.
Values from the second are prioritised in case of conflicts. This code
was originally posted on stackoverflow, by Aaron Hall and Bobik.
Parameters
----------
d1 : dict
The original dictionary.
d2 : dict
The new dictionary, which can overwrite values in `d1`.
Returns
-------
dict
The merged dictionary.
"""
from collections.abc import MutableMapping
for k, v in d1.items():
if k in d2:
# this next check is the only difference!
if all(isinstance(e, MutableMapping) for e in (v, d2[k])):
d2[k] = recursive_merge(v, d2[k])
# we could further check types and merge as appropriate here.
d3 = d1.copy()
d3.update(d2)
return d3
[docs]
def grism_background_subtraction(
prep_dir: PathLike = "../Prep",
field_root: str = "glass-a2744",
filters: list[str] = ["f115wn-clear", "f150wn-clear", "f200wn-clear"],
bkg_box_size: float = 3,
smooth_gauss_std: float = 1,
min_bkg_thresh: float = 0.0,
background2d_kwargs: dict = {},
diffuse_catalogue_kwargs: dict = {},
grism_prep_kwargs: dict = {},
**kwargs,
):
"""
Model and subtract a dispersed background from WFSS data.
Parameters
----------
prep_dir : PathLike, optional
The default location for all data, by default ``"../Prep"``.
field_root : str, optional
The name of the field, by default ``"glass-a2744"``.
filters : list[str], optional
The names of the filters used, by default
``["f115wn-clear", "f150wn-clear", "f200wn-clear"]``.
bkg_box_size : float, optional
The size of the boxes within which to calculate the background (in
arcseconds), by default ``3``.
smooth_gauss_std : float, optional
The standard deviation of the Gaussian used to smooth the
background (in arcseconds), by default ``1``.
min_bkg_thresh : float, optional
The minimum background value to be used. By default ``0``, which
means that only positive values of the background will be
dispersed and subtracted.
background2d_kwargs : dict, optional
Any additional arguments to pass to
`photutils.background.Background2D`, by default ``{}``.
diffuse_catalogue_kwargs : dict, optional
Any additional arguments to pass to
`grizli.pipeline.auto_script.multiband_catalog, by default ``{}``.
Parameters passed here determine the extraction of the diffuse
background, rather than the science objects in the field.
grism_prep_kwargs : dict, optional
Any additional arguments to pass to
`grizli.pipeline.auto_script.grism_prep`, by default ``{}``.
**kwargs : dict, optional
Any additional keyword arguments.
"""
from shutil import copy2
import astropy.units as u
from astropy.convolution import Gaussian2DKernel, convolve_fft
from astropy.io import fits
from astropy.wcs import WCS
from grizli.pipeline import auto_script
from photutils import background
prep_dir = Path(prep_dir)
img_backup_dir = prep_dir / "img_backup"
img_backup_dir.mkdir(exist_ok=True, parents=True)
all_bkgs = {}
sci_path = list(prep_dir.glob(f"{field_root}*ir_dr[zc]_sci.fits"))[0]
wht_path = list(prep_dir.glob(f"{field_root}*ir_dr[zc]_wht.fits"))[0]
if not (img_backup_dir / sci_path.name).is_file():
copy2(sci_path, img_backup_dir / sci_path.name)
if not (img_backup_dir / wht_path.name).is_file():
copy2(wht_path, img_backup_dir / wht_path.name)
for filt in filters:
sci_path = list(prep_dir.glob(f"{field_root}*{filt}_dr[zc]_sci.fits"))[0]
wht_path = list(prep_dir.glob(f"{field_root}*{filt}_dr[zc]_wht.fits"))[0]
if not (img_backup_dir / sci_path.name).is_file():
copy2(sci_path, img_backup_dir / sci_path.name)
if not (img_backup_dir / wht_path.name).is_file():
copy2(wht_path, img_backup_dir / wht_path.name)
# .astype(float) ensures byte order matches operating system
with fits.open(sci_path) as sci_hdul:
sci_img = sci_hdul[0].data.astype(float)
sci_wcs = WCS(sci_hdul[0])
with fits.open(wht_path) as wht_hdul:
wht_img = wht_hdul[0].data.astype(float)
from astropy import stats
sci_data = sci_img[wht_img != 0]
pixscale = (sci_wcs.proj_plane_pixel_area() ** 0.5).to(u.arcsec).value
filt_box = int(bkg_box_size / pixscale)
thresh = 5 * stats.mad_std(sci_data)
bkg = background.Background2D(
sci_img,
(filt_box, filt_box),
mask=sci_img >= thresh,
coverage_mask=wht_img == 0,
**background2d_kwargs,
)
bkg_data = bkg.background
bkg_data = convolve_fft(
bkg_data,
Gaussian2DKernel(
x_stddev=smooth_gauss_std / pixscale,
y_stddev=smooth_gauss_std / pixscale,
),
)
bkg_data[bkg_data <= min_bkg_thresh] = 0.0
all_bkgs[filt] = bkg_data
sci_hdul[0].data = bkg_data
sci_hdul.writeto(
img_backup_dir / f"{sci_path.stem}_bkg.fits", overwrite=True
)
sci_hdul.writeto(sci_path, overwrite=True)
auto_script.make_filter_combinations(
field_root,
filter_combinations={"ir": [f.upper() for f in filters]},
)
multiband_catalog_args = auto_script.get_yml_parameters()["multiband_catalog_args"]
multiband_catalog_args["run_detection"] = True
multiband_catalog_args["detection_background"] = False
multiband_catalog_args["photometry_background"] = False
multiband_catalog_args["threshold"] = 1.0
multiband_catalog_args["filters"] = filters
multiband_catalog_args["detection_params"] = {
"minarea": 9,
"filter_kernel": None,
"filter_type": "matched",
"clean": True,
"clean_param": 1,
"deblend_nthresh": 32,
"deblend_cont": 0.001,
}
multiband_catalog_args["rescale_weight"] = False
multiband_catalog_args["det_err_scale"] = 1.0
phot_cat = auto_script.multiband_catalog(
field_root=field_root,
**recursive_merge(multiband_catalog_args, diffuse_catalogue_kwargs),
)
os.chdir(prep_dir)
default_grism_kwargs = auto_script.get_yml_parameters()["grism_prep_args"]
default_grism_kwargs["files"] = [str(s) for s in Path.cwd().glob("*_rate.fits")][:]
grp = auto_script.grism_prep(
field_root=field_name,
**recursive_merge(default_grism_kwargs, grism_prep_kwargs),
)
rate_backup_dir = prep_dir / "rate_backup"
rate_backup_dir.mkdir(exist_ok=True, parents=True)
for flt_path in prep_dir.glob("*GrismFLT.fits"):
flt_model = fits.getdata(flt_path, "MODEL")
flt_hdr = fits.getheader(flt_path, "MODEL")
flt_pad = np.array(
[
flt_hdr["CRPIX1"] - flt_hdr["CRPIX1A"],
flt_hdr["CRPIX2"] - flt_hdr["CRPIX2A"],
]
).astype(int)
rate_path = prep_dir / f"{flt_path.stem.removesuffix(".01.GrismFLT")}_rate.fits"
if not (rate_backup_dir / rate_path.name).is_file():
copy2(rate_path, rate_backup_dir / rate_path.name)
with fits.open(rate_path) as rate_hdul:
rate_hdul["SCI"].data[rate_hdul["ERR"].data > 0] -= flt_model[
flt_pad[0] : -flt_pad[0],
flt_pad[1] : -flt_pad[1],
][rate_hdul["ERR"].data > 0]
rate_hdul.writeto(rate_path, overwrite=True)
background_backup_dir = prep_dir / "bkg_backups"
background_backup_dir.mkdir(exist_ok=True, parents=True)
for file in list(prep_dir.glob(f"{field_root}-ir*")) + [
prep_dir / f"{field_root}_phot.fits"
]:
file.replace(background_backup_dir / file.name)
for filt in filters:
sci_path = list(img_backup_dir.glob(f"{field_root}*{filt}_dr[zc]_sci.fits"))[0]
wht_path = list(img_backup_dir.glob(f"{field_root}*{filt}_dr[zc]_wht.fits"))[0]
bkg_path = list(
img_backup_dir.glob(f"{field_root}*{filt}_dr[zc]_sci_bkg.fits")
)[0]
bkg_data = fits.getdata(bkg_path)
# for file_path in [sci_path, wht_path]:
with fits.open(sci_path) as hdul:
hdul[0].data -= bkg_data
hdul[0].header["GRBKGSUB"] = True
hdul.writeto(prep_dir / sci_path.name, overwrite=True)
with fits.open(wht_path) as hdul:
# hdul[0].data -= bkg_data
hdul.writeto(prep_dir / wht_path.name, overwrite=True)
grism_backup_dir = prep_dir / "grism_backups"
grism_backup_dir.mkdir(exist_ok=True, parents=True)
for file in prep_dir.glob("*.GrismFLT.*"):
print(file)
file.replace(grism_backup_dir / file.name)
os.unlink(prep_dir / "../Extractions" / file.name)
os.chdir(prep_dir)
auto_script.make_filter_combinations(
field_root,
filter_combinations={"ir": [f.upper() for f in filters]},
)