"""
Functions to bin data to a specified signal/noise.
"""
from os import PathLike
from typing import Any
import astropy.visualization as astrovis
import matplotlib.pyplot as plt
import numpy as np
from astropy.io import fits
from astropy.table import Table
from astropy.wcs import WCS
from numpy.typing import ArrayLike
try:
POWERBIN_AVAIL = True
from powerbin import PowerBin
except:
POWERBIN_AVAIL = False
from niriss_tools.sed import colour_binning
__all__ = [
"hexbin",
"constrained_adaptive",
"save_binned_data_arr",
"save_binned_data_fits",
"bin_and_save",
]
[docs]
def hexbin(
x: ArrayLike,
y: ArrayLike,
signal: ArrayLike,
noise: ArrayLike,
bin_diameter: float = 10,
centre: tuple[float, float] | None = None,
) -> tuple[ArrayLike, int, ArrayLike, ArrayLike]:
"""
Hexagonally bin data, returning the binned S/N.
Parameters
----------
x : ArrayLike
A 1D array containing the x-positions of the data.
y : ArrayLike
A 1D array containing the y-positions of the data.
signal : ArrayLike
A 1D array of the signal.
noise : ArrayLike
A 1D array of the noise. All of ``signal``, ``noise``, ``x``, and
``y`` must have the same shape.
bin_diameter : float, optional
The minimum diameter of each bin (subject to pixel discretisation
effects), i.e. the long diameter of each hexagon. By default 10.
centre : tuple[float, float] | None, optional
If given, all bins will be aligned such that a hexagon is centred
on these coordinates. If not given, the highest S/N pixel will be
used instead.
Returns
-------
bin_labels : ArrayLike
The bin label assigned to each element of the input arrays.
nbins : int
The number of bins.
binned_s_n : ArrayLike
The S/N in each bin.
bin_inv : ArrayLike
A 1D array performing the inverse binning operation, i.e.
``binned_s_n[inv]`` gives an array of the same shape as ``x``.
"""
if centre is None:
centre_idx = np.argmax(signal / noise)
centre = np.array([x[centre_idx], y[centre_idx]])
xmin = np.min(x)
xmax = np.max(x)
ymin = np.min(y)
ymax = np.max(y)
x = np.asarray(x, float)
y = np.asarray(y, float)
sx = bin_diameter * 0.5 * np.sqrt(3)
sy = bin_diameter * 3 / 2
ix = ((x - xmin) + ((centre[0] - xmin) % sx)) / sx - 0.5
iy = ((y - ymin) + ((centre[1] - ymin) % sy)) / sy
nx = (np.max(ix) - np.min(ix)) + 1
ny = (np.max(iy) - np.min(iy)) + 1
nx1 = nx + 1
ny1 = ny + 1
nx2 = nx
ny2 = ny
ix1 = np.round(ix).astype(int)
iy1 = np.round(iy).astype(int)
ix2 = np.floor(ix).astype(int)
iy2 = np.floor(iy).astype(int)
# flat indices, plus one so that out-of-range points go to position 0.
i1 = np.where(
(0 <= ix1) & (ix1 < nx1) & (0 <= iy1) & (iy1 < ny1), ix1 * ny1 + iy1 + 1, 0
)
i2 = np.where(
(0 <= ix2) & (ix2 < nx2) & (0 <= iy2) & (iy2 < ny2), ix2 * ny2 + iy2 + 1, 0
)
d1 = (ix - ix1) ** 2 + 3.0 * (iy - iy1) ** 2
d2 = (ix - ix2 - 0.5) ** 2 + 3.0 * (iy - iy2 - 0.5) ** 2
bdist = d1 < d2
idxs = bdist * i1 + (~bdist * (i2 + np.nanmax(i1)))
u, bin_inv = np.unique(idxs, return_inverse=True)
nbins = len(u)
bin_labels = np.arange(len(u), dtype=int)[bin_inv]
binned_signal = np.bincount(bin_labels, weights=signal)
binned_noise = np.sqrt(np.bincount(bin_labels, weights=noise**2))
binned_s_n = binned_signal / binned_noise
return bin_labels, nbins, binned_s_n, bin_inv
[docs]
def constrained_adaptive(
signal: ArrayLike,
noise: ArrayLike,
bin_diameter: float = 4,
target_sn: float = 100,
plot: bool = True,
quiet: bool = False,
cvt: bool = True,
wvt: bool = True,
mask: ArrayLike | None = None,
use_hex: bool = True,
use_powerbin: bool = True,
) -> tuple[ArrayLike, int, ArrayLike, ArrayLike]:
"""
Perform a constrained adaptive binning method to reach a target S/N.
This function performs a 2-stage binning procedure, subject to a
minimum bin diameter. The initial stage is a hexagonal binning scheme,
which selects any bins that achieve the target S/N. This is followed
by a Voronoi tesselation on all remaining pixels, using the method of
`Cappellari & Copin 2003\
<https://ui.adsabs.harvard.edu/abs/2003MNRAS.342..345C>`_.
Parameters
----------
signal : ArrayLike
A 2D array containing the signal to be binned.
noise : ArrayLike
A 2D array containing the associated noise.
bin_diameter : float, optional
The minimum diameter of each bin (subject to pixel discretisation
effects), by default 10.
target_sn : float, optional
The desired S/N in each output bin, by default 100. This is only
guaranteed to be achieved for the hexagonal bins, as there is some
scatter on the S/N achieved through Voronoi binning.
plot : bool, optional
Show a comparison of the binned and unbinned data, alongside the
S/N and bin maps. By default ``False``.
quiet : bool, optional
Print the output of the Voronoi binning procedure, by default
``False``.
cvt : bool, optional
Attempt to achieve a centroidal Voronoi tessellation, using the
method of `Cappellari & Copin 2003 \
<https://ui.adsabs.harvard.edu/abs/2003MNRAS.342..345C>`_,
by default ``True``.
wvt : bool, optional
Use the weighted Voronoi tessellation method by
`Diehl & Statler 2006 \
<https://ui.adsabs.harvard.edu/abs/2006MNRAS.368..497D>`_.
By default True.
mask : ArrayLike | None, optional
Values in the input signal and noise to mask out, i.e. where
``mask==True`` will not be included in the binned data.
use_hex : bool, optional
Whether to perform the first stage of the binning procedure. If
``False``, then this will be skipped, and only the Voronoi binning
procedure will be attempted. By default ``True``.
use_powerbin : bool, optional
Use the PowerBin algorithm instead.
Returns
-------
bin_labels : ArrayLike
A 2D ``int`` array, containing the bin label assigned to each
element of the input arrays.
nbins : int
The number of bins.
binned_s_n : ArrayLike
A 1D array of length ``nbins``, containing the S/N in
each bin.
bin_inv : ArrayLike
A 2D array performing the inverse binning operation, i.e.
``binned_s_n[inv]`` gives an array of the same shape as ``x``.
"""
Y, X = np.mgrid[0 : signal.shape[0], 0 : signal.shape[1]]
f_signal = signal.ravel()
f_noise = noise.ravel()
if mask is None:
f_mask = np.zeros_like(f_noise)
else:
f_mask = mask.ravel()
# plt.imshow(mask)
# plt.show()
hex_idxs, hex_bin_n, hex_s_n, hex_inv = hexbin(
X.ravel(),
Y.ravel(),
f_signal,
f_noise,
bin_diameter=bin_diameter,
)
if not use_hex:
print("Skipping hex bin")
hex_s_n = np.zeros_like(hex_s_n)
good_hex_data = (hex_s_n >= target_sn)[hex_inv]
if mask is not None:
good_hex_data &= ~f_mask
num_hex_bins = len(np.unique(hex_idxs[good_hex_data]))
vorbin_idx = (
~good_hex_data & np.isfinite(f_noise) & np.isfinite(f_signal) & (f_noise > 0)
) # & (f_signal > 0)
if mask is not None:
vorbin_idx &= ~mask.ravel()
if POWERBIN_AVAIL and use_powerbin:
def capacity_spec(index):
"""Calculates (S/N)^2 for a bin from its pixel indices."""
# Standard S/N formula for uncorrelated noise
sn = np.sum(f_signal[vorbin_idx][index]) / np.sqrt(
np.sum(f_noise[vorbin_idx][index] ** 2)
)
# Example for correlated noise (see full example file for details):
# sn /= 1 + 1.07 * np.log10(len(index))
return sn**2
# print(
# f_signal[vorbin_idx],
# f_noise[vorbin_idx],
# (np.any(f_signal[vorbin_idx] <= 0)),
# (np.any(f_noise[vorbin_idx] <= 0)),
# )
pow_obj = PowerBin(
np.column_stack([X.ravel()[vorbin_idx], Y.ravel()[vorbin_idx]]),
capacity_spec,
target_capacity=target_sn**2,
# verbose=3,
pixelsize=1.0,
)
# pow_obj.plot(capacity_scale='sqrt', ylabel='S/N')
# plt.show()
vor_idxs, vor_inv = np.unique(pow_obj.bin_num, return_inverse=True)
else:
from vorbin.voronoi_2d_binning import voronoi_2d_binning
vorbin_output = voronoi_2d_binning(
x=X.ravel()[vorbin_idx],
y=Y.ravel()[vorbin_idx],
signal=f_signal[vorbin_idx],
noise=f_noise[vorbin_idx],
target_sn=target_sn,
pixelsize=1,
plot=False,
quiet=quiet,
cvt=cvt,
wvt=wvt,
)
vor_idxs, vor_inv = np.unique(vorbin_output[0], return_inverse=True)
vor_idxs += np.max(hex_idxs) + 1
num_vor_bins = len(np.unique(vor_idxs))
hex_idxs[vorbin_idx != ~good_hex_data] = -1
hex_idxs[vorbin_idx] = vor_idxs[vor_inv]
_all_unq, all_inv = np.unique(hex_idxs, return_inverse=True)
nbins = len(_all_unq)
all_idxs = np.arange(len(_all_unq))[all_inv]
binned_signal = np.bincount(all_idxs, weights=f_signal)
binned_noise = np.sqrt(np.bincount(all_idxs, weights=f_noise**2))
binned_s_n = binned_signal / binned_noise # [all_inv].reshape(signal.shape)
bin_labels = all_idxs.reshape(signal.shape)
bin_inv = all_inv.reshape(signal.shape)
print(f"Total bins: {len(_all_unq)} (Hex: {num_hex_bins}, Voronoi: {num_vor_bins})")
if plot:
fig, axs = plt.subplots(2, 2, sharex=True, sharey=True, constrained_layout=True)
img_norm = astrovis.ImageNormalize(
data=signal,
interval=astrovis.ManualInterval(
vmin=0.0, vmax=np.nanpercentile(signal, q=99.9)
),
stretch=astrovis.LogStretch(),
)
axs[0, 0].imshow(signal, norm=img_norm, origin="lower", cmap="plasma")
bin_sig_plot = (
np.bincount(all_idxs, weights=f_signal) / np.bincount(all_idxs)
)[all_inv]
bin_sig_plot[all_idxs == 0] = np.nan
axs[0, 1].imshow(
bin_sig_plot.reshape(signal.shape),
origin="lower",
norm=img_norm,
cmap="plasma",
)
sn_plot = (
np.bincount(all_idxs, weights=f_signal)
/ np.sqrt(np.bincount(all_idxs, weights=f_noise**2))
)[all_inv]
sn_plot[all_idxs == 0] = np.nan
axs[1, 0].imshow(
sn_plot.reshape(signal.shape),
origin="lower",
# norm=img_norm,
cmap="plasma",
)
rng = np.random.default_rng()
bin_plot = rng.random(size=len(_all_unq))[all_inv]
bin_plot[all_idxs == 0] = np.nan
axs[1, 1].imshow(
bin_plot.reshape(signal.shape),
origin="lower",
# norm=img_norm,
cmap="jet",
interpolation="none",
)
for a in axs.flatten():
a.set_facecolor("k")
# plt.imshow(hex_idxs.reshape(signal.shape), origin="lower")
plt.show()
return bin_labels, nbins, binned_s_n, bin_inv
[docs]
def save_binned_data_arr(
bin_labels: ArrayLike,
data_path: PathLike,
out_path: PathLike,
signal_hdu_idxs: ArrayLike,
noise_hdu_idxs: ArrayLike | None = None,
var_hdu_idxs: ArrayLike | None = None,
crop: tuple | None = None,
):
"""
Create a binned photometric catalogue from a segmentation map.
Realistically, this needs to be better thought out. Unlikely that
the input will be MEF, highly likely that it'll be combined from
multiple single FITS files.
Parameters
----------
bin_labels : ArrayLike
A 2D ``int`` array, containing the bin label assigned to each
element of the input arrays.
data_path : PathLike
The multi-extension fits file containing the data to be binned.
out_path : PathLike
The path to save the output.
signal_hdu_idxs : ArrayLike
The indices of the HDUs containing the direct images.
noise_hdu_idxs : ArrayLike | None, optional
The indices of the HDUs containing the noise images. One of
``noise_hdu_idxs`` and ``var_hdu_idxs`` must be given.
var_hdu_idxs : ArrayLike | None, optional
The indices of the HDUs containing the variance images.
crop : tuple | None, optional
A crop to be applied to the images before applying the binning
scheme, by default None.
"""
phot_cat = Table()
bin_ids, bin_inv = np.unique(bin_labels, return_inverse=True)
phot_cat["bin_id"] = bin_ids.astype(str)
phot_cat["bin_area"] = np.bincount(bin_labels.ravel())
if crop is None:
crop = (slice(0, None), slice(0, None))
with fits.open(data_path) as hdul:
for s in signal_hdu_idxs:
try:
phot_cat[s] = np.bincount(
bin_labels.ravel(), weights=hdul[s].data[crop].ravel()
)
except Exception as e:
print(e)
phot_cat[s] = np.nan
if noise_hdu_idxs is not None:
for s in noise_hdu_idxs:
try:
phot_cat[s] = np.bincount(
bin_labels.ravel(), weights=hdul[s].data[crop].ravel() ** 2
)
except Exception as e:
print(e)
phot_cat[s] = np.nan
elif var_hdu_idxs is not None:
for s in var_hdu_idxs:
try:
phot_cat[s] = np.bincount(
bin_labels.ravel(), weights=hdul[s].data[crop].ravel()
)
except Exception as e:
print(e)
phot_cat[s] = np.nan
binned_data = fits.HDUList(
[
fits.PrimaryHDU(),
fits.ImageHDU(
data=bin_labels,
# header=signal.header.copy(),
name="SEG_MAP",
),
fits.BinTableHDU(data=phot_cat, name="PHOT_CAT"),
]
)
binned_data.writeto(out_path)
[docs]
def save_binned_data_fits(
bin_labels: ArrayLike,
out_path: PathLike,
filter_keys: list,
signal_paths: list,
var_paths: list,
crop: tuple | None = None,
header: fits.Header | None = None,
overwrite: bool = True,
) -> None:
"""
Create a binned photometric catalogue from a segmentation map.
Unlike `~niriss_tools.sed.save_binned_data_arr`, this uses as input
multiple FITS files containing the signal and variance arrays.
Parameters
----------
bin_labels : ArrayLike
A 2D ``int`` array, containing the bin label assigned to each
element of the input arrays.
out_path : PathLike
The path to save the output.
filter_keys : list
The names of the filters corresponding to each of the signal and
variance image pairs. These will be used as column names in the
photometric catalogue.
signal_paths : list
A list containing the locations of the direct images.
var_paths : list
A list containing the locations of the associated variance images.
crop : tuple | None, optional
A crop to be applied to the images before applying the binning
scheme, by default None.
header : fits.Header | None = None
If supplied, this will be used to generate a header for the
segmentation map in the output file. If ``None``, a header will be
taken from the first image in ``signal_paths``.
overwrite : bool, optional
If a catalogue already exists at ``out_path``, this determines if
it should be written over. By default ``True``.
"""
phot_cat = Table()
bin_ids, bin_inv = np.unique(bin_labels, return_inverse=True)
phot_cat["bin_id"] = bin_ids.astype(str)
phot_cat["bin_area"] = np.bincount(bin_labels.ravel())
if crop is None:
crop = (slice(0, None), slice(0, None))
for i, (f, s, v) in enumerate(zip(filter_keys, signal_paths, var_paths)):
with fits.open(s) as sci_hdul:
try:
phot_cat[f"{f}_sci".lower()] = np.bincount(
bin_labels.ravel(), weights=sci_hdul[0].data[crop].ravel()
)
if header is None:
header = sci_hdul[0].header
wcs = WCS(header)[crop]
header.update(wcs.to_header())
except Exception as e:
print(e, "blah", f)
phot_cat[f"{f}_sci".lower()] = [np.nan] * len(bin_ids)
with fits.open(v) as var_hdul:
try:
phot_cat[f"{f}_var".lower()] = np.bincount(
bin_labels.ravel(), weights=var_hdul[0].data[crop].ravel()
)
except Exception as e:
print(e, "whhy")
phot_cat[f"{f}_var".lower()] = np.nan
binned_data = fits.HDUList(
[
fits.PrimaryHDU(),
fits.ImageHDU(
data=bin_labels.astype(float),
header=header,
name="SEG_MAP",
),
fits.BinTableHDU(data=phot_cat, name="PHOT_CAT"),
]
)
binned_data.writeto(out_path, overwrite=overwrite)
[docs]
def bin_and_save(
obj_id: int,
out_dir: PathLike,
seg_map: ArrayLike | PathLike,
info_dict: dict,
sn_filter: str,
target_sn: float = 100,
bin_scheme: str = "hexbin",
bin_diameter: float = 4,
seg_hdu_index: int | str = 0,
padding: int = 50,
overwrite: bool = False,
binned_name: str | None = None,
**bin_kwargs,
) -> PathLike:
"""
Rebin and save one object in a segmentation map.
This is a thin wrapper around other functions in this module, with a
slightly opinionated default set of parameters. If the user is
familiar with the lower-level functions, those will provide an
equivalent output.
Parameters
----------
obj_id : int
The unique identifier of an object in the full segmentation map.
out_dir : PathLike
The directory to which the output will be written, consisting of a
segmentation image for the binning scheme, and a photometric
catalogue.
seg_map : ArrayLike | PathLike
The segmentation map identifying the location of objects in the
individual images, where pixels with a given unique integer
correspond to the extent of a single object. This should have the
same alignment and shape as images in ``info_dict``.
info_dict : dict
A dictionary containing all information on the images to be used
when binning and generating the photometric catalogue. The keys of
the dictionary should correspond to the names of the filters (or
any desired column name in the output catalogue), and each entry
should itself contain a ``dict``, with the keys ``sci`` and
``var`` describing the locations of the PSF-matched science and
variance images.
sn_filter : str
The ``info_dict`` key to use when calculating the S/N for binning.
target_sn : float, optional
The desired S/N in each output bin, by default 100. This is only
guaranteed to be achieved for the hexagonal bins (if using this
binning scheme), as there is some scatter on the S/N achieved
through Voronoi binning.
bin_scheme : str, optional
Which binning scheme to use, by default ``"hexbin"``. In this
case, the first stage of the binning procedure will be performed.
If ``"vorbin"``, then this will be skipped, and only the Voronoi
binning procedure will be attempted. If ``"colour"``, then
pixels will be aggregated based on their colour.
bin_diameter : float, optional
The minimum diameter of each bin (subject to pixel discretisation
effects), by default 4.
seg_hdu_index : int | str, optional
The index or name of the HDU containing the segmentation map,
by default ``0``.
padding : int, optional
The extra padding to add to the object boundaries, by default 50.
overwrite : bool, optional
If a catalogue already exists in ``out_dir``, this determines if
it should be written over. By default ``False``.
binned_name : str | None, optional
Specify the exact name used for writing the output file. If
``None`` (default), this will be automatically generated from
the chosen binning parameters.
**bin_kwargs : dict, optional
Any additional parameters to be passed through to
`~niriss_tools.sed.constrained_adaptive`.
Returns
-------
PathLike
The path to the binned data, saved as a multi-extension FITS file.
"""
if isinstance(seg_map, PathLike):
seg_map = fits.getdata(seg_map, seg_hdu_index)
from niriss_tools.pipeline import seg_slice
obj_img_idxs = seg_slice(seg_map, obj_id, padding=padding)
signal = fits.getdata(info_dict[sn_filter]["sci"])[obj_img_idxs]
signal_hdr = fits.getheader(info_dict[sn_filter]["sci"])
signal_wcs = WCS(signal_hdr)[obj_img_idxs]
signal_hdr.update(signal_wcs.to_header())
# Don't forget that the noise is the square root of the variance array
noise = np.sqrt(fits.getdata(info_dict[sn_filter]["var"]))[obj_img_idxs]
if bin_scheme == "colour":
orig_images = []
for k, v in info_dict.items():
orig_images.append(fits.getdata(v["sci"])[obj_img_idxs])
bin_labels, nbins, bin_sn, bin_inv = colour_binning.colour_aggregate(
orig_images=orig_images,
signal=signal,
noise=noise,
target_sn=target_sn,
mask=seg_map[obj_img_idxs] != obj_id,
# crop=obj_img_idxs,
**bin_kwargs,
)
else:
bin_labels, nbins, bin_sn, bin_inv = constrained_adaptive(
signal=signal,
noise=noise,
target_sn=target_sn,
mask=seg_map[obj_img_idxs] != obj_id,
use_hex=use_hex,
bin_diameter=bin_diameter,
**bin_kwargs,
)
if binned_name is None:
# Give it a meaningful name - this avoids confusion if rerunning with multiple configurations
binned_name = f"{obj_id}_{bin_scheme}_{bin_diameter}_{target_sn}_{sn_filter}"
save_path = out_dir / f"{binned_name}_data.fits"
if save_path.is_file() and not overwrite:
print(
f"'{save_path.name}' already exists. Set `overwrite=True` "
"if you wish to overwrite this file."
)
else:
save_binned_data_fits(
bin_labels=bin_labels,
out_path=out_dir / f"{binned_name}_data.fits",
filter_keys=[*info_dict.keys()],
signal_paths=[info_dict[f]["sci"] for f in info_dict.keys()],
var_paths=[info_dict[f]["var"] for f in info_dict.keys()],
crop=obj_img_idxs,
header=signal_hdr,
)
return save_path