from copy import deepcopy
from webbpsf_ext.image_manip import convolve_image, _convolve_psfs_for_mp
from webbpsf_ext.image_manip import make_disk_image, distort_image
from webbpsf_ext.bandpasses import nircam_com_th
# Import libraries
from .pynrc_core import NIRCam, DetectorOps, merge_dicts
from .nrc_utils import *
from .maths.coords import gen_sgd_offsets, oversampled_coords
from .maths.image_manip import add_ipc, add_ppc, apply_pixel_diffusion
from .maths.image_manip import fractional_image_shift
from tqdm.auto import tqdm, trange
import logging
_log = logging.getLogger('pynrc')
import numpy as np
eps = np.finfo(float).eps
[docs]
class nrc_hci(NIRCam):
""" NIRCam High Contrast Imaging (HCI)
Subclass of the :class:`~pynrc.NIRCam` instrument class with updates for PSF
generation of off-axis PSFs. If a coronagraph is not present,
then this is effectively the same as the :class:`~pynrc.NIRCam` class.
"""
[docs]
def __init__(self, wind_mode='WINDOW', xpix=320, ypix=320, large_grid=True,
bar_offset=None, use_ap_info=True, autogen_coeffs=True,
sgd_type=None, slew_std=0, fsm_std=0, **kwargs):
"""Init function for NIRCam HCI class.
Parameters
----------
wind_mode : str
'FULL', 'STRIPE', or 'WINDOW'
xpix : int
Size of the detector readout along the x-axis. The detector is
assumed to be in window mode unless the user explicitly
sets wind_mode='FULL'.
ypix : int
Size of the detector readout along the y-axis. The detector is
assumed to be in window mode unless the user explicitly
sets wind_mode='FULL'.
large_grid : bool
Use a large number (high-density) of grid points to create coefficients.
If True, then produces a higher fidelity PSF variations across the FoV,
but will take much longer to generate on the first pass and requires more
disk space and memory while running.
bar_offset : float or None
Custom offset position along bar mask (-10 to +10 arcsec).
Set to None to auto-determine based on other configurations.
use_ap_info : bool
For subarray observations, the mask reference points are not
actually in the center of the array. Set this to true to
shift the sources to actual aperture reference location.
Default is to place in center of array.
autogen_coeffs : bool
Automatically generate base PSF coefficients. Equivalent to performing
:meth:`gen_psf_coeff`, :meth:`gen_wfedrift_coeff`, and :meth:`gen_wfemask_coeff`.
Default: True.
sgd_type : str or None
Small grid dither pattern. Valid types are
'9circle', '5box', '5diamond', '3bar', or '5bar'. If 'auto',
then defaults are '5diamond' for round masks, '5bar' for bar masks,
and '5diamond' for direct imaging. If None, then no FSM pointings,
but there will be a single slew.
fsm_std : float
One-sigma accuracy (mas) per axis of fine steering mirror positions.
This provides randomness to each position relative to the nominal
central position. Ignored for central SGD position.
***Values should be in units of mas***
slew_std : float
One-sigma accuracy (mas) per axis of the initial slew. This is
applied to all positions and gives a baseline offset relative
to the desired mask center.
***Values should be in units of mas***
"""
super().__init__(wind_mode=wind_mode, xpix=xpix, ypix=ypix, fov_bg_match=True,
autogen_coeffs=autogen_coeffs, **kwargs)
if autogen_coeffs:
# Enable WFE drift
self.gen_wfedrift_coeff()
# Enable mask-dependent
self.gen_wfemask_coeff(large_grid=large_grid)
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
self._use_ap_info = use_ap_info
# Set locations based on detector
self._set_xypos()
# Create mask throughput images seen by detector
self._gen_cmask()
# Create default SGD offsets
self.sgd_type = sgd_type
self.slew_std = slew_std
self.fsm_std = fsm_std
self.gen_pointing_offsets()
setup_logging(log_prev, verbose=False)
self._bar_offset = bar_offset
@property
def bar_offset(self):
"""Offset position along bar mask (arcsec)."""
if 'TA' in self.siaf_ap.AperName:
bar_offset = 0
elif self._bar_offset is None:
# Bar offset should get auto-determined based on aperture name info
bar_offset = self.get_bar_offset()
bar_offset = 0 if bar_offset is None else bar_offset # Circular masks return None
else:
bar_offset = self._bar_offset
return bar_offset
@bar_offset.setter
def bar_offset(self, value):
"""Set the bar offset position. None to auto-determine"""
# Only update if the value changes
if self.image_mask is None:
self._bar_offset = 0 #None
elif self.image_mask[-2:]=='WB':
# Value limits between -10 and 10
if (value is not None) and np.abs(value)>10:
value = 10 if value>0 else -10
msg1 = 'bar_offset value must be between -10 and 10 arcsec.'
msg2 = ' Setting to {}.'.format(value)
_log.warning('{}\n{}'.format(msg1,msg2))
self._bar_offset = value
else:
self._bar_offset = 0
[docs]
def gen_offset_psf(self, offset_r, offset_theta, sp=None, return_oversample=False,
wfe_drift=None, use_coeff=True, coron_rescale=True, use_cmask=False,
recenter=True, diffusion_sigma=None, psf_corr_over=None,
**kwargs):
"""Create a PSF offset from center of mask
NOTE: The resulting PSF is still in the center of the output image, but
the PSF morphology matches that of the off-axis location.
Generate some off-axis PSF at a given (r,theta) offset from center of mask.
The `offset_r` and `offset_theta` parameters are assumed to be in 'idl' frame.
This function is mainly for coronagraphic observations where the
off-axis PSF varies w.r.t. position. The PSF is centered in the
resulting image. The offset values are assumed to be in 'idl' coordinate
frame relative to the mask center.
WARNING: Bar offsets are added in calc_psf functions, so don't double count.
Parameters
----------
offset_r : float
Radial offset of the target from center in arcsec.
offset_theta : float
Position angle for that offset, in degrees CCW (+Y).
Keyword Args
------------
sp : None, :mod:`webbpsf_ext.synphot_ext.Spectrum`
If not specified, the default is flat in phot lam
(equal number of photons per spectral bin).
return_oversample : bool
Return either the pixel-sampled or oversampled PSF.
use_coeff : bool
If True, uses `calc_psf_from_coeff`, other STPSF's built-in `calc_psf`.
coron_rescale : bool
Rescale total flux of off-axis coronagraphic PSF to better match
analytic prediction when source overlaps coronagraphic occulting
mask. Primarily used for planetary companion and disk PSFs.
Default: True.
recenter : bool
Recenter the PSF to the center of the image where offsets are caused
by small (sub-pixel) tip/tilt values or dispersion from Lyot wedges.
Default: True.
diffusion_sigma : float
Apply pixel diffusion to the disk image. This is a Gaussian
convolution with a sigma value in pixels.
psf_corr_over : ndarray
Oversampled PSF correction factor. If not None, then apply this
correction factor to the oversampled PSF. The correction factor
should be the same size as the oversampled PSF.
"""
coords = rtheta_to_xy(offset_r, offset_theta)
kwargs['return_hdul'] = False
# FULL TA coronagraphic masks require siaf_ap to be passed
# since we want offset relative to TA position rather than
# center of the coronagraphic 20x20" aperture
apname = self.siaf_ap.AperName
if self.is_coron and ('TAMASK' in apname) and ('FULL' in apname):
kwargs['siaf_ap'] = self.siaf_ap
use_cmask = True
# Always return oversample if recentering
if recenter and (not return_oversample):
return_osamp_temp = True
else:
return_osamp_temp = return_oversample
# print(coords, 'idl')
if use_coeff:
if self.psf_coeff is None:
raise ValueError('PSF coefficients have not been generated. Use `gen_psf_coeff` method first.')
psf = self.calc_psf_from_coeff(sp=sp, return_oversample=return_osamp_temp,
wfe_drift=wfe_drift, coord_vals=coords, coord_frame='idl',
coron_rescale=coron_rescale, **kwargs)
else:
psf = self.calc_psf(sp=sp, return_oversample=return_osamp_temp,
wfe_drift=wfe_drift, coord_vals=coords, coord_frame='idl', **kwargs)
# Recenter PSF?
if recenter:
psf = self.recenter_psf(psf, sampling=self.oversample)
psf = frebin(psf, scale=1/self.oversample) if not return_oversample else psf
# Begin coronagraphic mask attenuation
# Skipped if not coronagraphic
# Determine if any throughput loss due to coronagraphic mask
# artifacts, such as the mask holder or ND squares.
# If ND_acq=True, then PSFs already included ND throughput in bandpass.
if use_cmask and self.is_coron and (not self.ND_acq) and use_coeff:
# Location of PSF in image
delx_asec, dely_asec = coords
cmask_det = self.mask_images['DETSAMP']
# 1. For the FULL TA masks, we want offsets relative to the mask reference point
# 2. For all others, we want reference points relative to the 20"x20" coronagraphic field
# TODO: This doesn't seem to be used anymore?
# if 'TAMASK' in apname:
# siaf_ap_relative = self.siaf_ap
# else:
# si_mask_apname = self._psf_coeff_mod.get('si_mask_apname')
# siaf_ap_relative = self.siaf[si_mask_apname]
# If we're use_ap_info is True, then the cmask image is setup to be w.r.t.
# the SIAF aperture. But if False, then cmask image is centered at middle of mask.
if self._use_ap_info or ('FULL' in self.det_info['wind_mode']):
xy_sci = self.siaf_ap.convert(delx_asec, dely_asec, 'idl', 'sci')
xsci, ysci = np.array(xy_sci).astype('int')
else:
# Otherwise assumed mask is in center of subarray
xcen, ycen = get_im_cen(cmask_det)
delx_pix, dely_pix = np.array([delx_asec + self.bar_offset, dely_asec]) / self.pixelscale
xsci, ysci = np.array([xcen+delx_pix, ycen+dely_pix]).astype('int')
# Extract a 5x5 region to average from the cmask
cmask_sub = cmask_det[ysci-2:ysci+3,xsci-2:xsci+3]
trans = 0 if len(cmask_sub)==0 else np.mean(cmask_sub)
# First, anything in a region around the circular mask should have trans=1
if ( (np.sqrt(delx_asec**2 + dely_asec**2) < 4.5) and
(('210R' in apname) or ('335R' in apname) or ('430R' in apname)) and
('TAMASK' not in apname) and (trans>0)):
trans = 1
# Next, anything in the SWB/LWB regions should have transmission set to 1
elif ( (np.abs(dely_asec)<4.5) and
(('SWB' in apname) or ('LWB' in apname)) and
('TAMASK' not in apname) and (trans>0)):
trans = 1
else:
# COM substrate already accounted for in filter throughput curve,
# so it will be double-counted here. Need to divide it out.
w_um = self.bandpass.avgwave().to_value('um')
com_th = nircam_com_th(wave_out=w_um)
trans /= com_th
else:
trans = 1
# Apply transmission mask
psf *= trans
# Apply diffusion
if (diffusion_sigma is not None) and (diffusion_sigma>0):
osamp = self.oversample if return_oversample else 1
psf = apply_pixel_diffusion(psf, diffusion_sigma*osamp)
# Apply PSF correction factor
if psf_corr_over is not None:
# Shift to align with PSF if not recentered
if not recenter:
psf_corr_over = psf_corr_over[::-1,::-1]
psf_corr_over = self.recenter_psf(psf_corr_over, sampling=self.oversample, pad=True, cval=1)
psf_corr_over = psf_corr_over[::-1,::-1]
# Downsample to detector pixels?
psf_corr = psf_corr_over if return_oversample else frebin(psf_corr_over, 1/self.oversample, total=False)
psf *= crop_image(psf_corr, psf.shape, fill_val=1)
return psf
def _set_xypos(self, xy=None):
"""
Set x0 and y0 subarray positions.
"""
wind_mode = self.det_info['wind_mode']
if xy is not None:
self.update_detectors(x0=xy[0], y0=xy[1])
elif self._use_ap_info:
xvert, yvert = self.siaf_ap.closed_polygon_points('det')
xy = (np.min(xvert).astype('int'), np.min(yvert).astype('int'))
self.update_detectors(x0=xy[0], y0=xy[1])
elif self.is_coron:
full = True if 'FULL' in wind_mode else False
if full:
xcen, ycen = (self.siaf_ap.XDetRef, self.siaf_ap.YDetRef)
else:
cdict = coron_ap_locs(self.module, self.channel, self.image_mask, full=full)
xcen, ycen = cdict['cen']
xpix, ypix = (self.det_info['xpix'], self.det_info['ypix'])
if full:
x0, y0 = (0,0)
else:
x0, y0 = (int(xcen-xpix/2), int(ycen-ypix/2))
# Make sure subarray sizes don't push out of bounds
if (y0 + ypix) > 2048:
y0 = 2048 - ypix
if (x0 + xpix) > 2048:
x0 = 2048 - xpix
self.update_detectors(x0=x0, y0=y0)
def _gen_cmask(self):
""" Generate coronagraphic mask transmission images.
Output images are in 'sci' coordinates.
"""
self.mask_images = gen_coron_mask(self)
[docs]
def attenuate_with_mask(self, image_oversampled, cmask=None):
""" Image attenuation from coronagraph mask features
Multiply image data by coronagraphic mask.
Excludes region already affected by observed occulting mask.
Involves mainly ND Squares and opaque COM holder.
Appropriately accounts for COM substrate wavelength-dep throughput.
WARNING: If self.ND_acq=True, then bandpass already includes
ND throughput, so be careful not to double count.
"""
cmask = self.mask_images.get('OVERSAMP') if cmask is None else cmask
# In case of imaging
if cmask is None:
return image_oversampled
return attenuate_with_coron_mask(self, image_oversampled, cmask)
[docs]
def gen_pointing_offsets(self, rand_seed=None):
"""
Create a series of x and y position offsets for a SGD pattern.
This includes the central position as the first in the series.
By default, will also add random movement errors using the
`slew_std` and `fsm_std` keywords. Returned values are in arcsec.
Uses the attributes `sgd_type`, `slew_std`, and `fsm_std`.
Attributes
==========
sgd_type : str or None
Small grid dither pattern. Valid types are
'9circle', '5box', '5diamond', '3bar', or '5bar'. If 'auto',
then defaults are '5diamond' for round masks, '5bar' for bar masks,
and '5diamond' for direct imaging. If None, then no FSM pointings,
but there will be a single slew.
fsm_std : float
One-sigma accuracy per axis of fine steering mirror positions.
This provides randomness to each position relative to the nominal
central position. Ignored for central position.
Values should be in units of mas.
slew_std : float
One-sigma accuracy per axis of the initial slew. This is applied
to all positions and gives a baseline offset relative to the
desired mask center.
Values should be in units of mas.
Parameters
==========
rand_seed : int
Input a random seed in order to make reproduceable pseudo-random
numbers.
"""
sgd_type = self.sgd_type
slew_std = self.slew_std
fsm_std = self.fsm_std
if sgd_type == 'auto':
if self.is_coron and self.image_mask[-1]=='R':
sgd_type = '5diamond'
elif self.is_coron and self.image_mask[-1]=='B':
sgd_type = '5bar'
else:
sgd_type = '5diamond'
if sgd_type is None:
rng = np.random.default_rng(seed=rand_seed)
xoff, yoff = rng.normal(scale=slew_std, size=2) / 1000
fsm_std = None
else:
xoff, yoff = gen_sgd_offsets(sgd_type, slew_std=slew_std, fsm_std=fsm_std, rand_seed=rand_seed)
_log.info("Saving SGD position info to `self.pointing_info` dictionary attribute")
self.pointing_info = {'xoff' : xoff, 'yoff' : yoff,}
[docs]
class obs_hci(nrc_hci):
"""NIRCam coronagraphic observations
Subclass of the :class:`~pynrc.nrc_hci` instrument class used to observe
stars (plus exoplanets and disks) with either a coronagraph or direct
imaging.
The main concept is to generate a science target of the primary
source along with a simulated disk structure. Planets are further
added to the astronomical scene. A separate reference source is
also defined for PSF subtraction, which contains a specified WFE.
A variety of methods exist to generate slope images and analyze
the PSF-subtracted results via images and contrast curves.
"""
[docs]
def __init__(self, sp_sci, distance, sp_ref=None, wfe_ref_drift=5, wfe_roll_drift=2,
wind_mode='WINDOW', xpix=320, ypix=320, disk_params=None, autogen_coeffs=True,
sgd_type=None, slew_std=0, fsm_std=0, **kwargs):
"""Init function for obs_hci class
Parameters
----------
sp_sci : :class:`webbpsf_ext.synphot_ext.Spectrum`
A synphot spectrum of science target (e.g., central star).
Should already be normalized to the apparent flux.
sp_ref : :class:`webbpsf_ext.synphot_ext.Spectrum` or None
A synphot spectrum of reference target.
Should already be normalized to the apparent flux.
distance : float
Distance in parsecs to the science target. This is used for
flux normalization of the planets and disk.
wfe_ref_drift: float
WFE drift in nm RMS between the science and reference targets.
Expected values are between ~3-10 nm.
wfe_roll_drift: float
WFE drift in nm RMS between science roll angles. Default=0.
wind_mode : str
'FULL', 'STRIPE', or 'WINDOW'
xpix : int
Size of the detector readout along the x-axis. The detector is
assumed to be in window mode unless the user explicitly
sets wind_mode='FULL'.
ypix : int
Size of the detector readout along the y-axis. The detector is
assumed to be in window mode unless the user explicitly
sets wind_mode='FULL'.
disk_params : dict
Arguments describing disk model information for a given FITS file:
- 'file' : Path to model file or an HDUList.
- 'pixscale' : Pixel scale for model image (arcsec/pixel).
- 'dist' : Assumed model distance in parsecs.
- 'wavelength' : Wavelength of observation in microns.
- 'units' : String of assumed flux units (ie., MJy/arcsec^2 or muJy/pixel)
- 'cen_star' : True/False. Is a star already placed in the central pixel?
autogen_coeffs : bool
Automatically generate base PSF coefficients. Equivalent to performing
`self.gen_psf_coeff()`. `gen_wfedrift_coeff`, and `gen_wfemask_coeff`.
Default: True.
use_ap_info : bool
For subarray observations, the mask reference points are not actually in the
center of the array. Set this to True to shift the sources to actual
aperture reference location. Otherwise, default will place in center of array.
sgd_type : str or None
Small grid dither pattern. Valid types are
'9circle', '5box', '5diamond', '3bar', or '5bar'. If 'auto',
then defaults are '5diamond' for round masks, '5bar' for bar masks,
and '5diamond' for direct imaging. If None, then no FSM pointings,
but there will be a single slew.
fsm_std : float
One-sigma accuracy per axis of fine steering mirror positions.
This provides randomness to each position relative to the nominal
central position. Ignored for central position.
***Values should be in units of mas***
slew_std : float
One-sigma accuracy per axis of the initial slew. This is applied
to all positions and gives a baseline offset relative to the
desired mask center.
***Values should be in units of mas***
"""
if 'FULL' in wind_mode: xpix = ypix = 2048
if 'STRIPE' in wind_mode: xpix = 2048
super().__init__(wind_mode=wind_mode, xpix=xpix, ypix=ypix, autogen_coeffs=autogen_coeffs,
sgd_type=sgd_type, slew_std=slew_std, fsm_std=fsm_std, **kwargs)
# wind_mode = self.Detector.wind_mode
# if (wind_mode=='FULL') and (self.channel=='short' or self.channel=='SW'):
# raise NotImplementedError('SW Full Frame not yet implemented.')
# Spectral models
self.sp_sci = sp_sci
self.sp_ref = sp_sci if sp_ref is None else sp_ref
self.wfe_ref_drift = wfe_ref_drift
self.wfe_roll_drift = wfe_roll_drift
# Distance to source in pc
self.distance = distance
self._planets = []
# Open and rescale input disk image to observation parameters
self._disk_params = disk_params
self.gen_disk_hdulist()
if autogen_coeffs:
self.gen_disk_psfs()
@property
def wfe_ref_drift(self):
"""WFE drift (nm) of ref obs relative to sci obs"""
return self._wfe_ref_drift
@wfe_ref_drift.setter
def wfe_ref_drift(self, value):
"""Set the WFE drift value between sci and ref observations"""
self._wfe_ref_drift = value
@property
def wfe_roll_drift(self):
"""WFE drift (nm) of Roll2 obs relative to Roll1 obs"""
return self._wfe_roll_drift
@wfe_roll_drift.setter
def wfe_roll_drift(self, value):
"""Set the WFE drift value between roll observations"""
self._wfe_roll_drift = value
[docs]
def gen_pointing_offsets(self, sgd_type=None, slew_std=None, fsm_std=None,
rand_seed=None, verbose=False):
"""
Create a series of x and y position offsets for a SGD pattern.
This includes the central position as the first in the series.
By default, will also add random position errors using the
`slew_std` and `fsm_std` keywords. Returned values are in arcsec.
This initializes a set of target acquisition offsets
for each roll position and reference observation.
Parameters
==========
sgd_type : str or None
Small grid dither pattern. Valid types are
'9circle', '5box', '5diamond', '3bar', '5bar', '5miri', and '9miri'
where the first four refer to NIRCam coronagraphic dither
positions and the last two are for MIRI coronagraphy. If 'auto',
then defaults are '5diamond' for round masks, '5bar' for bar masks,
and '5diamond' for direct imaging. If None, then no FSM pointings,
but there will be a single slew.
fsm_std : float
One-sigma accuracy per axis of fine steering mirror positions.
This provides randomness to each position relative to the nominal
central position. Ignored for central position.
Values should be in units of mas.
slew_std : float
One-sigma accuracy per axis of the initial slew. This is applied
to all positions and gives a baseline offset relative to the
desired mask center.
Values should be in units of mas.
"""
sgd_type = self.sgd_type if sgd_type is None else sgd_type
slew_std = self.slew_std if slew_std is None else slew_std
fsm_std = self.fsm_std if fsm_std is None else fsm_std
if sgd_type == 'auto':
if self.is_coron and self.image_mask[-1]=='R':
sgd_type = '5diamond'
elif self.is_coron and self.image_mask[-1]=='B':
sgd_type = '5bar'
else:
sgd_type = '5diamond'
rng = np.random.default_rng(seed=rand_seed)
if sgd_type is None:
xyoff_ref = rng.normal(scale=slew_std, size=2) / 1000
fsm_std = None
else:
xoff_ref, yoff_ref = gen_sgd_offsets(sgd_type, slew_std=slew_std,
fsm_std=fsm_std, rand_seed=rand_seed)
xyoff_ref = np.array([xoff_ref,yoff_ref]).transpose()
xyoff_roll1 = rng.normal(scale=slew_std, size=2) / 1000
xyoff_roll2 = rng.normal(scale=slew_std, size=2) / 1000
_log.info("Saving SGD position info to `self.pointing_info` dictionary attribute")
self.pointing_info = {
'sgd_type': sgd_type, 'slew_std': slew_std, 'fsm_std': fsm_std,
'roll1': xyoff_roll1, 'roll2': xyoff_roll2, 'ref': xyoff_ref,
}
if verbose:
print('Pointing Info')
for k in self.pointing_info.keys():
v = self.pointing_info[k]
print(" {:<10} :".format(k), v)
@property
def disk_params(self):
return self._disk_params
[docs]
def gen_disk_hdulist(self, file=None, pixscale=None, dist=None,
wavelength=None, units=None, cen_star=None,
shape_out=None, **kwargs):
"""Create a correctly scaled disk model image.
Rescale disk model flux to current pixel scale and distance.
If instrument bandpass is different from disk model, scales
flux assuming a grey scattering model.
Result (in photons/sec) is saved in self.disk_hdulist attribute.
"""
if self._disk_params is None:
self.disk_hdulist = None
else:
if file is not None:
self._disk_params['file'] = file
if pixscale is not None:
self._disk_params['pixscale'] = pixscale
if dist is not None:
self._disk_params['dist'] = dist
if wavelength is not None:
self._disk_params['wavelength'] = wavelength
if units is not None:
self._disk_params['units'] = units
if cen_star is not None:
self._disk_params['cen_star'] = cen_star
file = self._disk_params['file']
pixscale = self._disk_params['pixscale']
dist = self._disk_params['dist']
wavelength = self._disk_params['wavelength']
units = self._disk_params['units']
cen_star = self._disk_params['cen_star']
# TODO: Double-check 'APERNAME', 'DET_X', 'DET_Y', 'DET_V2', 'DET_V3'
# correspond to the desired observation
disk_hdul = _gen_disk_hdulist(self, file, pixscale, dist, wavelength, units, cen_star,
sp_star=self.sp_sci, dist_out=self.distance,
shape_out=shape_out, **kwargs)
self.disk_hdulist = disk_hdul
[docs]
def update_detectors(self, verbose=False, do_ref=False, **kwargs):
if do_ref:
self.update_detectors_ref(verbose=verbose, **kwargs)
else:
# Any time update_detectors is called, also call gen_ref_det()
super().update_detectors(verbose=verbose, **kwargs)
# Updates ref detector window size
self.gen_ref_det()
[docs]
def update_detectors_ref(self, verbose=False, **kwargs):
"""
An easy-to-identify shortcut to `gen_ref_det`, because I
keep forgetting to run it to independently of `update_detectors`.
Keyword Args
------------
wind_mode : str
Window mode type 'FULL', 'STRIPE', 'WINDOW'.
xpix : int
Size of window in x-pixels for frame time calculation.
ypix : int
Size of window in y-pixels for frame time calculation.
x0 : int
Lower-left x-coord position of detector window.
y0 : int
Lower-left y-coord position of detector window.
read_mode : str
NIRCam Ramp Readout mode such as 'RAPID', 'BRIGHT1', etc.
nint : int
Number of integrations (ramps).
ngroup : int
Number of groups in a integration.
nf : int
Number of frames per group.
nd1 : int
Number of drop frame after reset (before first group read).
nd2 : int
Number of drop frames within a group (ie., groupgap).
nd3 : int
Number of drop frames after final read frame in ramp.
nr1 : int
Number of reset frames within first ramp.
nr2 : int
Number of reset frames for subsequent ramps.
"""
self.gen_ref_det(verbose=verbose, **kwargs)
[docs]
def gen_ref_det(self, verbose=False, **kwargs):
"""
Function to generate and update Reference Detector class.
Used to keep track of detector and multiaccum config,
which can differ between sci and ref observations.
"""
# Check if kwargs is empty
if not kwargs:
try:
kwargs = self._det_info_ref
except AttributeError:
kwargs = {}
else:
try:
self._det_info_ref.update(kwargs)
except AttributeError:
self._det_info_ref = kwargs
kwargs = self._det_info_ref
# These should always be the same between sci and ref
kw_copy = ['wind_mode', 'xpix', 'ypix', 'x0', 'y0']
for kw in kw_copy:
kwargs[kw] = self.det_info[kw]
# Update reference detector class
try:
del self.Detector_ref
except AttributeError:
pass
self.Detector_ref = DetectorOps(detector=self.detector, **kwargs)
# Update stored kwargs
kw1 = self.Detector_ref.to_dict()
_ = kw1.pop('detector', None)
kw2 = self.Detector_ref.multiaccum.to_dict()
self._det_info_ref = merge_dicts(kw1,kw2)
if verbose:
print('New Ramp Settings')
keys = ['read_mode', 'nf', 'nd2', 'ngroup', 'nint']
for k in keys:
v = self._det_info_ref[k]
if isinstance(v,float): print("{:<9} : {:>8.0f}".format(k, v))
else: print(" {:<10} : {:>8}".format(k, v))
print('New Detector Settings')
keys = ['wind_mode', 'xpix', 'ypix', 'x0', 'y0']
for k in keys:
v = self._det_info_ref[k]
if isinstance(v,float): print("{:<9} : {:>8.0f}".format(k, v))
else: print(" {:<10} : {:>8}".format(k, v))
print('New Ramp Times')
ma = self.Detector_ref.times_to_dict()
keys = ['t_group', 't_frame', 't_int', 't_int_tot1', 't_int_tot2', 't_exp', 't_acq']
for k in keys:
print(' {:<10} : {:>8.3f}'.format(k, ma[k]))
[docs]
def gen_disk_psfs(self, wfe_drift=0, force=False, use_coeff=True, recenter=True,
diffusion_sigma=None, psf_corr_over=None, **kwargs):
"""
Save instances of NIRCam PSFs that are incrementally offset
from coronagraph center to convolve with a disk image.
"""
from webbpsf_ext.webbpsf_ext_core import _transmission_map
if (self._disk_params is None) and (force==False):
# No need to generate if no disk
self.psf_list = None
elif self.image_mask is None: # Direct imaging
# If no mask, then assume PSF looks the same at all radii
# This return a single ndarray
kwargs['return_oversample'] = True
kwargs['return_hdul'] = True
kwargs['wfe_drift'] = wfe_drift
kwargs['use_coeff'] = use_coeff
if use_coeff:
self.psf_list = self.calc_psf_from_coeff(**kwargs)
else:
hdul = self.calc_psf(**kwargs)
key = 'OVERDIST' if self.include_distortions else 'OVERSAMP'
self.psf_list = fits.HDUList(hdul[key])
elif 'WB' in self.image_mask: # Bar masks
kwargs['ysci_vals'] = 0
kwargs['xsci_vals'] = np.linspace(-8,8,9) # np.linspace(-9,9,19)
kwargs['wfe_drift'] = wfe_drift
kwargs['use_coeff'] = use_coeff
self.psf_list = self.calc_psfs_grid(osamp=self.oversample, **kwargs)
else: # Circular masks, just need single on-axis PSF
kwargs['return_oversample'] = True
kwargs['return_hdul'] = True
kwargs['wfe_drift'] = wfe_drift
if use_coeff:
self.psf_list = self.calc_psf_from_coeff(**kwargs)
else:
hdul = self.calc_psf(**kwargs)
key = 'OVERDIST' if self.include_distortions else 'OVERSAMP'
self.psf_list = fits.HDUList(hdul[key])
# Recenter PSFs and apply diffusion and PSF correction factor
if self.psf_list is not None:
for hdu in self.psf_list:
if recenter and hdu.header.get('RECENTER',False)==False:
do_recenter = True
hdu.data = self.recenter_psf(hdu.data, sampling=self.oversample)
hdu.header['RECENTER'] = (True, 'PSF was recentered')
else:
do_recenter = False
hdu.header['RECENTER'] = (False, 'PSF was not recentered')
# Apply diffusion
if (diffusion_sigma is not None) and (diffusion_sigma>0):
hdu.data = apply_pixel_diffusion(hdu.data, diffusion_sigma*self.oversample)
# Apply PSF correction factor
if psf_corr_over is not None:
# Shift to align with PSF if not recentered
if not do_recenter:
psf_corr_over = psf_corr_over[::-1,::-1]
psf_corr_over = self.recenter_psf(psf_corr_over, sampling=self.oversample, pad=True, cval=1)
psf_corr_over = psf_corr_over[::-1,::-1]
hdu.data *= crop_image(psf_corr_over, hdu.data.shape, fill_val=1)
# Generate oversampled mask transmission for mask-dependent PSFs
if (self.image_mask is not None) and (self.mask_images.get('OVERMASK') is None):
nx, ny = (self.det_info['xpix'], self.det_info['ypix'])
if 'FULL' in self.det_info['wind_mode']:
trans = build_mask_detid(self.Detector.detid, oversample=self.oversample,
pupil=self.pupil_mask, nd_squares=False, mask_holder=False)
elif self._use_ap_info:
siaf_ap = self.siaf_ap
xv = np.arange(nx)
yv = np.arange(ny)
xg, yg = np.meshgrid(xv,yv)
res = _transmission_map(self, (xg,yg), 'sci', siaf_ap=siaf_ap)
# trans = frebin(res[0]**2, scale=self.oversample, total=False)
trans = zrebin(res[0]**2, self.oversample, total=False)
else:
siaf_ap = self.siaf[self.aperturename]
xr, yr = (siaf_ap.XSciRef, siaf_ap.YSciRef)
xv = np.arange(nx) - nx/2 + xr
yv = np.arange(ny) - ny/2 + yr
xg, yg = np.meshgrid(xv,yv)
xidl, yidl = siaf_ap.convert(xg,yg,'sci','idl')
res = _transmission_map(self, (xidl,yidl), 'idl', siaf_ap=siaf_ap)
# trans = frebin(res[0]**2, scale=self.oversample, total=False)
trans = zrebin(res[0]**2, self.oversample, total=False)
self.mask_images['OVERMASK'] = trans
# renormalize all PSFs for disk convolution to 1
# for hdu in self.psf_list:
# hdu.data /= hdu.data.sum()
[docs]
def planet_spec(self, **kwargs):
"""Exoplanet spectrum
Return the planet spectrum from Spiegel & Burrows (2012) normalized
to distance of current target. Output is a :class:`webbpsf_ext.synphot_ext.Spectrum`.
See `spectra.companion_spec()` function for more details.
"""
dist = self.distance if (kwargs.get('dist') is None) else kwargs.pop('dist')
sp = companion_spec(self.bandpass, dist=dist, **kwargs)
return sp
@property
def planets(self):
"""Planet info (if any exists)"""
return self._planets
[docs]
def delete_planets(self):
"""Remove planet info"""
try: del self._planets
except: pass
self._planets = []
[docs]
def add_planet(self, name=None, model='SB12', atmo='hy3s', mass=10, age=100, entropy=10,
xy=None, rtheta=None, runits='AU', Av=0, renorm_args=None, sptype=None,
accr=False, mmdot=None, mdot=None, accr_rin=2, truncated=False, **kwargs):
"""Insert a planet into observation.
Add exoplanet information that will be used to generate a point
source image using a spectrum from Spiegel & Burrows (2012).
Use self.delete_planets() to delete them.
Coordinate convention is for +N up and +E to left.
Parameters
==========
model : str
Exoplanet model to use ('sb12', 'bex', 'cond') or
stellar spectrum model ('bosz', 'ck04models', 'phoenix').
atmo : str
A string consisting of one of four atmosphere types:
['hy1s', 'hy3s', 'cf1s', 'cf3s']. Only relevant for SB12.
mass: int
Number 1 to 15 Jupiter masses for SB12.
BEX and COND spans much lower masses (sub-Saturn, depending on age).
age: float
Age in millions of years (1-1000).
entropy: float
Initial entropy (8.0-13.0) in increments of 0.25
sptype : str
Instead of using a exoplanet spectrum, specify a stellar type.
renorm_args : tuple
Renormalization arguments in case you want
very specific luminosity in some bandpass.
Includes (value, units, bandpass).
Av : float
Extinction magnitude (assumes Rv=4.0) of the exoplanet
due to being embedded in a disk.
xy : tuple, None
(X,Y) position in sky coordinates of companion (N up, E left).
rtheta : tuple, None
Radius and position angle relative to stellar position.
Alternative to xy keyword.
runits : str
What are the spatial units? Valid values are 'AU', 'asec', or 'pix'.
accr : bool
Include accretion? default: False
mmdot : float
From Zhu et al. (2015), the Mjup^2/yr value.
If set to None then calculated from age and mass.
mdot : float
Or use mdot (Mjup/yr) instead of mmdot.
accr_rin : float
Inner radius of accretion disk (units of RJup; default: 2)
truncated: bool
Full disk or truncated (ie., MRI; default: False)?
"""
if (xy is None) and (rtheta is None):
_log.error('Either xy or rtheta must be specified.')
if (xy is not None) and (rtheta is not None):
_log.warning('Both xy and rtheta are specified. ' + \
'The xy keyword shall take priority.')
# Size of subarray image in terms of pixels
image_shape = (self.det_info['ypix'], self.det_info['xpix'])
# XY location of planets within subarray with units from runits keyword
loc = rtheta_to_xy(rtheta[0], rtheta[1]) if xy is None else xy
# Define pixel location relative to the center of the subarray
if 'AU' in runits:
au_per_pixel = self.distance*self.pixelscale
xoff, yoff = np.array(loc) / au_per_pixel
elif ('asec' in runits) or ('arcsec' in runits):
xoff, yoff = np.array(loc) / self.pixelscale
elif ('pix' in runits):
xoff, yoff = loc
else:
errstr = "Do not recognize runits='{}'".format(runits)
raise ValueError(errstr)
# Offset in terms of arcsec
xoff_asec, yoff_asec = np.array([xoff, yoff]) * self.pixelscale
_log.debug('(xoff,yoff) = {} pixels'.format((xoff,yoff)))
_log.debug('(xoff_asec,yoff_asec) = {} arcsec'.format((xoff_asec,yoff_asec)))
# Make sure planet is within image bounds
sh_diff = np.abs(np.array([yoff,xoff])) - np.array(image_shape)/2
if np.any(sh_diff>=0):
_log.warning('xoff,yoff = {} is beyond image boundaries.'.format((xoff,yoff)))
if (model.lower()=='sb12') and (mass<1):
_log.warning(f'Mass of {mass:.3f} MJup less than SB12 model minimum of 1 MJup')
# X and Y pixel offsets from center of image
# Dictionary of planet info
if sptype is None:
d = {'name': name, 'model': model, 'atmo': atmo, 'mass': mass, 'age': age,
'entropy':entropy, 'Av':Av, 'renorm_args':renorm_args,
'accr':accr, 'mmdot':mmdot, 'mdot':mdot, 'accr_rin':accr_rin,
'truncated':truncated, 'xyoff_asec':(xoff_asec, yoff_asec)}
else:
d = {'name': name, 'model':model, 'sptype':sptype, 'Av':Av,
'renorm_args':renorm_args, 'xyoff_asec':(xoff_asec, yoff_asec)}
self._planets.append(d)
[docs]
def gen_planets_image(self, PA_offset=0, xyoff_asec=(0,0), use_cmask=True,
sp=None, wfe_drift=None, use_coeff=True, return_oversample=False,
shift_method=None, interp=None, quiet=True, use_ap_info=None, **kwargs):
"""Create image of just planets.
Use info stored in self.planets to create a noiseless slope image
of just the exoplanets (no star).
Coordinate convention is for +N up and +E to left.
Parameters
----------
PA_offset : float
Rotate entire scene by some position angle.
Positive values are counter-clockwise from +Y direction.
This should be -1 times telescope V3 PA.
xyoff_asec : tuple
Offsets (dx,dy) specified in arcsec. These are meant to be
for minor shifts, such as SGD. Bar offsets are accounted
for automatically.
use_cmask : bool
Use the coronagraphic mask image to determine if any planet is
getting obscurred by a corongraphic mask feature. Default: True.
sp : :class:`webbpsf_ext.synphot_ext.Spectrum`
Specify the spectrum of the planet to be used. If None, then
uses info stored in self.planets.
wfe_drift : float
WFE drift value (in nm RMS). Not usually a concern for companion
PSFs, so default is 0.
use_coeff : bool
If True, uses `calc_psf_from_coeff`, otherwise use STPSF's
built-in `calc_psf` for stellar sources.
return_oversample : bool
Return either the detector pixel-sampled or oversampled image.
shift_method : str
Function to use for shifting image. Options are:
- 'fourier' : Shift in Fourier space
- 'fshift' : Shift using interpolation
- 'opencv' : Shift using OpenCV warpAffine
interp : str
Interpolation method to use for shifting using 'fshift' or 'opencv.
Default is 'cubic'.
For 'opencv', valid options are 'linear', 'cubic', and 'lanczos'.
for 'fshift', valid options are 'linear', 'cubic', and 'quintic'.
Keyword Args
------------
coron_rescale : bool
Rescale total flux of off-axis coronagraphic PSF to better match
analytic prediction when source overlaps coronagraphic occulting
mask. Primarily used for planetary companion and disk PSFs.
Default: True.
diffusion_sigma : float
Apply pixel diffusion to PSFs. This is the standard deviation of
the Gaussian kernel used for diffusion in terms of detector pixels.
Default: None.
psf_corr_over : ndarray
Oversampled PSF correction factor. This is used to better match
STPSF PSFs to observed PSFs. Default: None.
"""
if len(self.planets)==0:
_log.info("No planet info at self.planets")
return 0.0
PA_offset=0 if PA_offset is None else PA_offset
# Additional field offsets
offx_asec, offy_asec = xyoff_asec
# Size of final image
ypix, xpix = (self.det_info['ypix'], self.det_info['xpix'])
xpix_over = xpix * self.oversample
ypix_over = ypix * self.oversample
# Oversampled pixel scale
pixscale_over = self.pixelscale / self.oversample
image_over = np.zeros([ypix_over, xpix_over])
bar_offset = self.bar_offset
bar_offpix = bar_offset / self.pixelscale
itervals = self.planets if quiet else tqdm(self.planets, desc='Companions', leave=False)
for pl in itervals:
##################################
# Generate Image
# Create slope image (postage stamp) of planet
source = kwargs.get('source', None)
# Skip sp creation if source is provided because it gets passed
# directly to gen_offset_psf using kwargs
sp2 = self.planet_spec(**pl) if (sp is None) and (source is None) else sp
# Location relative to star
plx_asec, ply_asec = pl['xyoff_asec']
# Add in PA offset to get planet position in observed frame
if PA_offset!=0:
plx_asec, ply_asec = xy_rot(plx_asec, ply_asec, PA_offset)
# print(f'Planet offset ({plx_asec:.4f}, {ply_asec:.4f}) asec')
# bar offsets are added inside calc_psf_from_coeff
xoff_idl, yoff_idl = (plx_asec + offx_asec, ply_asec + offy_asec)
r, th = xy_to_rtheta(xoff_idl, yoff_idl)
psf_planet = self.gen_offset_psf(r, th, sp=sp2, return_oversample=True,
use_coeff=use_coeff, wfe_drift=wfe_drift,
use_cmask=use_cmask, **kwargs)
##################################
# Shift image
# Determine final shift amounts to mask location
# Shift to position relative to center of image
use_ap_info = self._use_ap_info if use_ap_info is None else use_ap_info
if (('FULL' in self.det_info['wind_mode']) and (self.image_mask is not None)) or use_ap_info:
xcen, ycen = (self.siaf_ap.XSciRef - 1, self.siaf_ap.YSciRef - 1)
delx_pix = (xcen - (xpix/2. - 0.5)) # 'sci' pixel shifts
dely_pix = (ycen - (ypix/2. - 0.5)) # 'sci' pixel shifts
delx_asec, dely_asec = np.array([delx_pix, dely_pix]) * self.pixelscale
else:
# Otherwise assumed mask is already in center of subarray
delx_pix, dely_pix = (bar_offpix, 0)
delx_asec, dely_asec = np.array([delx_pix, dely_pix]) * self.pixelscale
# Add dither offsets
delx_asec += offx_asec
dely_asec += offy_asec
# PSF shifting
delx_over, dely_over = np.array([delx_asec, dely_asec]) / pixscale_over
delx_det, dely_det = np.array([delx_asec, dely_asec]) / self.pixelscale
# print(f'delx, dely = ({delx_asec:.4f}, {dely_asec:.4f}) asec')
# print(f'delx, dely = ({delx_det:.2f}, {dely_det:.2f}) det pixels')
# Determine planet PSF shift in pixels compared to center of mask
# Convert delx and dely from 'idl' to 'sci' coords
xsci_ref, ysci_ref = (self.siaf_ap.XSciRef, self.siaf_ap.YSciRef)
xsci_pl, ysci_pl = self.siaf_ap.idl_to_sci(plx_asec, ply_asec)
delx_sci, dely_sci = (xsci_pl - xsci_ref, ysci_pl - ysci_ref)
delx_over += delx_sci * self.oversample
dely_over += dely_sci * self.oversample
# # Shift and crop maintaining original image
# if interp is None:
# interp = 'linear' if ('FULL' in self.det_info['wind_mode']) else 'cubic'
# psf_planet = pad_or_cut_to_size(psf_planet, (ypix_over, xpix_over),
# offset_vals=(dely_over, delx_over),
# shift_func=shift_func, interp=interp)
# Shift image and crop to original size
if shift_method is None:
shift_method = 'fshift' if ('FULL' in self.det_info['wind_mode']) else 'fourier'
if interp is None:
interp = 'linear' if ('FULL' in self.det_info['wind_mode']) else 'cubic'
try:
# Sometimes fourier shift fails if the source is too close to the edge
psf_planet = image_shift_with_nans(psf_planet, xshift=delx_over, yshift=dely_over,
shift_method=shift_method, interp=interp,
pad=True, return_padded=True, cval=0, **kwargs)
# psf_planet = fractional_image_shift(psf_planet, delx_over, dely_over,
# method=shift_method, interp=interp, pad=True, **kwargs)
except ValueError:
psf_planet = image_shift_with_nans(psf_planet, xshift=delx_over, yshift=dely_over,
shift_method='fshift', interp='linear',
pad=True, return_padded=True, cval=0, **kwargs)
# psf_planet = fractional_image_shift(psf_planet, delx_over, dely_over,
# method='fshift', interp='linear', pad=True, **kwargs)
psf_planet = crop_image(psf_planet, (ypix_over, xpix_over))
# Remove any NaNs or negative values
psf_planet[np.isnan(psf_planet)] = 0
psf_planet[psf_planet<0] = 0
# Add to image
image_over += psf_planet
# Remove any negative numbers
image_over[image_over<0] = 0
if return_oversample:
return image_over
else:
return frebin(image_over, scale=1/self.oversample)
[docs]
def gen_disk_image(self, PA_offset=0, xyoff_asec=(0,0), use_cmask=True,
return_oversample=False, diffusion_sigma=None, psf_corr_over=None,
apply_distortions=None, xypix=None, shift_method=None, interp=None,
use_ap_info=None, **kwargs):
"""Create image of just disk.
Generate a (noiseless) convolved image of the disk
at some PA offset. The PA offset value will rotate
the image CCW. Image units of e-/sec.
Coordinate convention is for N up and E to left.
For now, we don't perform any changes to the WFE.
Parameters
----------
PA_offset : float
Rotate entire scene by some position angle.
Positive values are counter-clockwise from +Y direction.
This should be -1 times telescope V3 PA.
xyoff_asec : tuple
Offsets (dx,dy) specified in arcsec. These are meant to be
for minor shifts, use as SGD. Bar offsets are accounted
for automatically.
use_cmask : bool
Use the coronagraphic mask image to attenuate disk regions
getting obscurred by a corongraphic mask feature
return_oversample : bool
Return either the detector pixel-sampled or oversampled image.
diffusion_sigma : float
Apply pixel diffusion to the PSF convolved with disk image.
Value is in units of detector pixels.
psf_corr_over : ndarray
Oversampled PSF correction factor to apply to disk PSF to better
match empirical data.
"""
if self.disk_hdulist is None:
return 0.0
# Final image shape
det = self.Detector
if xypix is None:
ypix, xpix = (det.ypix, det.xpix) #(self.det_info['ypix'], self.det_info['xpix'])
else:
xpix = ypix = xypix
oversample = self.oversample
pixscale_over = self.pixelscale / oversample
if apply_distortions is None:
apply_distortions = self.include_distortions
# Bar offset in arcsec
bar_offset = self.bar_offset
# In detector pixels
bar_offpix = bar_offset / self.pixelscale
# Determine final shift amounts to location along bar
# Shift to position relative to center of image
xcen_det, ycen_det = get_im_cen(np.zeros([ypix,xpix]))
use_ap_info = self._use_ap_info if use_ap_info is None else use_ap_info
if (('FULL' in self.det_info['wind_mode']) and (self.image_mask is not None)) or use_ap_info:
xref, yref = (self.siaf_ap.XSciRef - 1, self.siaf_ap.YSciRef - 1)
# Offset relative to center of image
delx_pix = xref - xcen_det # 'sci' pixel shifts
dely_pix = yref - ycen_det # 'sci' pixel shifts
# Convert to 'idl' offsets
# delx_asec, dely_asec = self.siaf_ap.convert(xcen+delx_pix, ycen+dely_pix, 'sci', 'idl')
delx_asec, dely_asec = np.array([delx_pix, dely_pix]) * self.pixelscale
else:
# Otherwise assumed mask is in center of subarray for simplicity
# For odd dimensions, this is in a pixel center.
# For even dimensions, this is at the pixel boundary.
xref, yref = xcen_det, ycen_det
# Add bar offset
xref += bar_offpix # Add bar offset
delx_pix, dely_pix = (bar_offpix, 0)
delx_asec = delx_pix * self.pixelscale
dely_asec = dely_pix * self.pixelscale
# Add dither offsets
offx_asec, offy_asec = xyoff_asec # 'idl' dither offsets
delx_asec += offx_asec
dely_asec += offy_asec
# Expand to oversampled array
hdul_disk = deepcopy(self.disk_hdulist)
# Final size of subarray
out_shape = np.array([ypix*oversample, xpix*oversample], dtype='int')
# Extend to accommodate rotations and shifts
extend = int(np.sqrt(2)*np.max(self.disk_hdulist[0].data.shape))
# Ensure even number of oversampled pixels
extend += oversample - np.mod(extend,oversample)
oversized_shape = out_shape + extend
# Don't make smaller than current size
orig_shape = hdul_disk[0].data.shape
new_shape = np.array([oversized_shape,orig_shape]).max(axis=0)
hdul_disk[0].data = crop_image(hdul_disk[0].data, new_shape)
##################################
# Shift/Rotate image
##################################
# Rotate and shift oversampled disk image
# Positive PA_offset will rotate image clockwise (PA_offset = -1*PA_V3)
# Shift image and crop to original size
if shift_method is None:
shift_method = 'fshift' if ('FULL' in self.det_info['wind_mode']) else 'fourier'
if interp is None:
interp = 'linear' if ('FULL' in self.det_info['wind_mode']) else 'cubic'
order_def = 1 if ('FULL' in self.det_info['wind_mode']) else 3
order = kwargs.get('order', order_def)
shift_func = {'fourier':fourier_imshift, 'fshift':fshift, 'opencv': cv_shift}[shift_method]
hdul_rot_shift = rotate_shift_image(hdul_disk, angle=PA_offset, order=order,
delx_asec=delx_asec, dely_asec=dely_asec,
shift_func=shift_func, interp=interp)
hdul_rot_shift[0].data = crop_image(hdul_rot_shift[0].data, out_shape)
hdul_rot_shift[0].header['XIND_REF'] = (xref*oversample, "x index of aperture reference")
hdul_rot_shift[0].header['YIND_REF'] = (yref*oversample, "y index of aperture reference")
hdul_rot_shift[0].header['CFRAME'] = 'sci'
hdul_disk.close()
del hdul_disk
##################################
# Image distortion
##################################
# Crop to reasonably small size (get rid of 0s)
if apply_distortions:
res = crop_zero_rows_cols(hdul_rot_shift[0].data, symmetric=True, return_indices=True)
im_crop, ixy_vals = res
ix1, ix2, iy1, iy2 = ixy_vals
# Get 'sci' pixel values that correspond to center of input image
xarr_sci = (np.arange(hdul_rot_shift[0].data.shape[1]) + 1) / oversample
yarr_sci = (np.arange(hdul_rot_shift[0].data.shape[0]) + 1) / oversample
cen_sci = (np.mean(xarr_sci[ix1:ix2]), np.mean(yarr_sci[iy1:iy2]))
im_orig = hdul_rot_shift[0].data.copy()
# Distory cropped image (excludes columns/rows of 0s)
hdul_rot_shift[0].data = im_crop
im_distort = distort_image(hdul_rot_shift, aper=self.siaf_ap, sci_cen=cen_sci)
im_orig[iy1:iy2, ix1:ix2] = im_distort
hdul_rot_shift[0].data = im_orig
##################################
# Mask attenuation
##################################
# Multiply raw disk data by coronagraphic mask.
# Exclude region already affected by observed mask.
# Mostly ND Squares and opaque COM holder.
# If ND_acq, then disk already multipled by ND throughput in bandpass.
cmask = self.mask_images['OVERSAMP']
if use_cmask and (cmask is not None) and (not self.ND_acq):
hdul_rot_shift[0].data = self.attenuate_with_mask(hdul_rot_shift[0].data, cmask=cmask)
##################################
# Image convolution
##################################
hdul_psfs = self.psf_list
if not self.is_coron: # Single PSF
image_conv = convolve_image(hdul_rot_shift, hdul_psfs)
else:
# For coronagraphy, assume position-dependent PSFs are a function
# of coronagraphic mask transmission, off-axis, and on-axis PSFs.
# PSF(x,y) = trans(x,y)*psf_off + (1-trans(x,y))*psf_on
trans = self.mask_images['OVERMASK']
# Off-axis component
hdul_rot_shift_off = deepcopy(hdul_rot_shift)
hdul_rot_shift_off[0].data = hdul_rot_shift_off[0].data * trans
if kwargs.get('use_coeff',True):
hdul_psf_off = self.calc_psf_from_coeff(return_oversample=True, return_hdul=True,
coord_vals=(5,5), coord_frame='idl')
else:
hdul_temp = self.calc_psf(return_hdul=True, coord_vals=(5,5), coord_frame='idl')
key = 'OVERDIST' if self.include_distortions else 'OVERSAMP'
hdul_psf_off = fits.HDUList(hdul_temp[key])
# Recenter off-axis PSF if on-axis PSFs are recentered
if hdul_psfs[0].header['RECENTER']:
do_recenter = True
hdul_psf_off[0].data = self.recenter_psf(hdul_psf_off[0].data, sampling=self.oversample)
else:
do_recenter = False
# Apply diffusion to off-axis PSF
if (diffusion_sigma is not None) and (diffusion_sigma>0):
hdul_psf_off[0].data = apply_pixel_diffusion(hdul_psf_off[0].data, diffusion_sigma*self.oversample)
# Apply PSF correction factor to off-axis PSF
if psf_corr_over is not None:
# Shift to align with PSF if not recentered
if not do_recenter:
psf_corr_over = psf_corr_over[::-1,::-1]
psf_corr_over = self.recenter_psf(psf_corr_over, sampling=self.oversample, pad=True, cval=1)
psf_corr_over = psf_corr_over[::-1,::-1]
# Downsample to detector pixels?
psf_corr = psf_corr_over if return_oversample else frebin(psf_corr_over, 1/self.oversample, total=False)
hdul_psf_off[0].data *= crop_image(psf_corr, hdul_psf_off[0].data.shape, fill_val=1)
# Perform off-axis convolution
image_conv_off = convolve_image(hdul_rot_shift_off, hdul_psf_off)
# On-axis component (closest PSF convolution)
hdul_rot_shift_on = deepcopy(hdul_rot_shift)
hdul_rot_shift_on[0].data = hdul_rot_shift_on[0].data * (1 - trans)
image_conv_on = convolve_image(hdul_rot_shift_on, hdul_psfs)
image_conv = image_conv_on + image_conv_off
if return_oversample:
return image_conv
else:
return frebin(image_conv, scale=1/oversample)
[docs]
def gen_slope_image(self, PA=0, xyoff_asec=(0,0), return_oversample=False,
exclude_disk=False, exclude_planets=False, exclude_noise=False,
zfact=None, do_ref=False, do_roll2=False, im_star=None, sat_val=0.9,
wfe_drift0=0, wfe_ref_drift=None, wfe_roll_drift=None,
shift_method=None, interp=None, apply_distortions=None,
diffusion_sigma=None, kipc=None, kppc=None, psf_corr_over=None,
use_ap_info=None, **kwargs):
"""Create slope image of observation
Beware that stellar position (centered on a pixel) will likely not
fall in the exact center of the slope image (between pixel borders)
because images are generally even while psf_fovs may be odd.
Parameters
----------
PA : float
Position angle of roll position (counter-clockwise, from West to East).
Scene will rotate in opposite direction. This should be similar to V3 PA.
xyoff_asec : None or tuple
Positional offset of scene from reference location in 'idl' coords.
Shift occurs after PA rotation. Default is (0,0), but set to None to
get the default dither position from self.pointing_info.
do_ref : bool
Slope image for reference star observation using `self.wfe_ref_drift`.
do_roll2 : bool
Slope image for observation during Roll2 using `self.wfe_roll_drift`.
Will compound with `do_ref` if both are set.
exclude_disk : bool
Do not include disk in final image (for radial contrast),
but still add Poisson noise from disk.
exclude_planets : bool
Do not include planets in final image (for radial contrast),
but still add Poisson noise from disk.
exclude_noise : bool
Don't add random Gaussian noise (detector+photon)
zfact : float
Zodiacal background factor (default=2.5)
im_star : ndarray or None
Pass a precomputed slope image of the stellar source already
positioned at it's correct location.
sat_val : float
Fraction of full well to consider as saturated. Used to determine
number of unsaturated groups for linear fit, which feeds into
estimated effective noise equation. For pixels that saturatea in a
single group, the noise is assumed to be that for a single frame.
**Final image does not flag saturated pixels!**
wfe_drift0 : float
Initial RMS WFE drift value of First PSF. Roll2 and Ref observations
will be incremented from this. Default is 0.
wfe_ref_drift : float
WFE drift between sci and ref observations (nm RMS).
wfe_roll_drift : float
WFE drift between Roll1 and Roll2 observations (nm RMS).
return_oversample : bool
Return either the detector pixel-sampled or oversampled image.
shift_method : str
Function to use for shifting image. Options are:
- 'fourier' : Shift in Fourier space
- 'fshift' : Shift using interpolation
- 'opencv' : Shift using OpenCV warpAffine
interp : str
Interpolation method to use for shifting using 'fshift' or 'opencv.
Default is 'cubic'.
For 'opencv', valid options are 'linear', 'cubic', and 'lanczos'.
for 'fshift', valid options are 'linear', 'cubic', and 'quintic'.
Keyword Args
------------
use_cmask : bool
Use the coronagraphic mask image to attenuate planet or disk that
is obscurred by a corongraphic mask feature. (Default=True)
ra : float
Right ascension in decimal degrees
dec : float
Declination in decimal degrees
thisday : int
Calendar day to use for background calculation. If not given, will use the
average of visible calendar days.
ideal_Poisson : bool
If set to True, use total signal for noise estimate,
otherwise MULTIACCUM equation is used.
use_coeff : bool
If True, uses `calc_psf_from_coeff`, other STPSF's built-in `calc_psf`
for stellar sources.
"""
# Initial WFE drift offset value
wfe_drift = wfe_drift0
# Option to override wfe_ref_drift and wfe_roll_drift
wfe_ref_drift = self.wfe_ref_drift if wfe_ref_drift is None else wfe_ref_drift
wfe_roll_drift = self.wfe_roll_drift if wfe_roll_drift is None else wfe_roll_drift
if do_ref:
wfe_drift = wfe_drift + wfe_ref_drift
det = self.Detector_ref
sp = self.sp_ref
else:
det = self.Detector
sp = self.sp_sci
# Special case of sp not specified, then output should be 0
if sp is None:
im_star = 0
# Add additional WFE drift for 2nd roll position
if do_roll2:
wfe_drift = wfe_drift + wfe_roll_drift
# oversample = 1 if not return_oversample else self.oversample
oversample = self.oversample
pixscale_over = self.pixelscale / oversample
# Final detector image shape
ypix, xpix = (det.ypix, det.xpix)
# Oversampled size
xpix_over = xpix * oversample
ypix_over = ypix * oversample
# Bar offset in arcsec
bar_offset = self.bar_offset
# In detector pixels
bar_offpix = bar_offset / self.pixelscale
# Additional field offsets
if xyoff_asec is None:
if do_ref:
xyoff_asec = self.pointing_info['ref']
# If multiple SGD points for ref, then select nominal position
if len(xyoff_asec)>1:
xyoff_asec = xyoff_asec[0]
elif do_roll2:
xyoff_asec = self.pointing_info['roll2']
else:
xyoff_asec = self.pointing_info['roll1']
offx_asec, offy_asec = xyoff_asec # 'idl' dither offsets
##################################
# Generate Image
##################################
# Default to use coronagraph mask ND square and holder attenuation
use_cmask = kwargs.get('use_cmask')
if use_cmask is None:
kwargs['use_cmask'] = True
# Get (r,th) in idl coordinates relative to mask center (arcsec) for PSF creation
# bar offsets are added inside gen_offset_psf during calc_psf_from_coeff
xoff_idl, yoff_idl = (offx_asec, offy_asec)
r, th = xy_to_rtheta(xoff_idl, yoff_idl)
# print((r, th), (xoff_idl, yoff_idl))
# Stellar PSF doesn't rotate
# Generates an oversampled stellar PSF that is centered in the detector image
if im_star is None:
_log.info(' gen_slope_image: Creating stellar PSF...')
im_star = self.gen_offset_psf(r, th, sp=sp, wfe_drift=wfe_drift, return_oversample=True,
diffusion_sigma=diffusion_sigma, psf_corr_over=psf_corr_over, **kwargs)
elif isinstance(im_star, (int,float)) and (im_star==0):
im_star = np.zeros([ypix_over, xpix_over])
# Always crop to desired size in case passed image is larger
im_star = crop_image(im_star, (ypix_over, xpix_over))
##################################
# Shift image
##################################
# Determine final shift amounts to mask location
# Shift to position relative to center of image
# if (('FULL' in self.det_info['wind_mode']) and (self.image_mask is not None)) or self._use_ap_info:
# xcen, ycen = (self.siaf_ap.XSciRef - 1, self.siaf_ap.YSciRef - 1)
# delx_pix = (xcen - (xpix/2. - 0.5)) # 'sci' pixel shifts
# dely_pix = (ycen - (ypix/2. - 0.5)) # 'sci' pixel shifts
# delx_asec, dely_asec = np.array([delx_pix, dely_pix]) * self.pixelscale
# else:
# # Otherwise assumed mask is already in center of subarray
# delx_pix, dely_pix = (bar_offpix, 0)
# delx_asec, dely_asec = np.array([delx_pix, dely_pix]) * self.pixelscale
# Get center positions of detector image and equivalent position of star position
xcen_det, ycen_det = get_im_cen(np.zeros([ypix,xpix]))
use_ap_info = self._use_ap_info if use_ap_info is None else use_ap_info
if (('FULL' in self.det_info['wind_mode']) and (self.image_mask is not None)) or use_ap_info:
xref, yref = (self.siaf_ap.XSciRef - 1, self.siaf_ap.YSciRef - 1)
# SIAF Offset relative to center of image
delx_pix = xref - xcen_det # 'sci' orientation shifts
dely_pix = yref - ycen_det # 'sci' orientation shifts
else:
# Otherwise assumed mask is in center of subarray for simplicity
delx_pix, dely_pix = (bar_offpix, 0)
# Convert to 'idl' offsets in arcsec
# delx_asec, dely_asec = self.siaf_ap.convert(xref+delx_pix, yref+dely_pix, 'sci', 'idl')
delx_asec, dely_asec = np.array([delx_pix, dely_pix]) * self.pixelscale
# Add dither offsets
delx_asec += offx_asec
dely_asec += offy_asec
# PSF shifting
delx_over, dely_over = np.array([delx_asec, dely_asec]) / pixscale_over
delx_det, dely_det = np.array([delx_asec, dely_asec]) / self.pixelscale
_log.debug(f'r, th = ({r:.4f} asec, {th:.1f} deg)')
_log.debug(f'delx, dely = ({delx_asec:.4f}, {dely_asec:.4f}) asec')
_log.debug(f'delx, dely = ({delx_det:.3f}, {dely_det:.3f}) det pixels')
# print(f'r, th = ({r:.4f} asec, {th:.1f} deg)')
# print(f'delx, dely = ({delx_asec:.4f}, {dely_asec:.4f}) asec')
# print(f'delx, dely = ({delx_det:.3f}, {dely_det:.3f}) det pixels')
# Stellar PSF doesn't rotate
# Shift and crop maintaining original image
# if interp is None:
# interp = 'linear' if ('FULL' in self.det_info['wind_mode']) else 'cubic'
# im_star = pad_or_cut_to_size(im_star, (ypix_over, xpix_over),
# offset_vals=(dely_over, delx_over),
# shift_func=shift_func, interp=interp)
if shift_method is None:
shift_method = 'fshift' if ('FULL' in self.det_info['wind_mode']) else 'fourier'
if interp is None:
interp = 'linear' if ('FULL' in self.det_info['wind_mode']) else 'cubic'
# print(delx_over, dely_over, shift_method, interp)
im_star = fractional_image_shift(im_star, delx_over, dely_over, pad=True,
method=shift_method, interp=interp, **kwargs)
im_star[im_star<0] = 0
##################################
# Disk and Planet images
##################################
if do_ref:
no_disk = True
no_planets = True
else:
no_disk = exclude_disk and exclude_noise
no_planets = exclude_planets and exclude_noise
if self.disk_hdulist is None:
no_disk = True
if len(self.planets)==0:
no_planets = True
# Make sure to include planets and disks for Poisson noise calculations
# Telescope PA is counter-clockwise, therefore image rotation is opposite direction
kwargs2 = kwargs.copy()
kwargs2['PA_offset'] = -1*PA
kwargs2['xyoff_asec'] = xyoff_asec
kwargs2['return_oversample'] = True # return_oversample
kwargs2['use_coeff'] = kwargs.get('use_coeff', True)
kwargs2['diffusion_sigma'] = diffusion_sigma
kwargs2['psf_corr_over'] = psf_corr_over
kwargs2['shift_method'] = shift_method
kwargs2['interp'] = interp
kwargs2['use_ap_info'] = use_ap_info
# Companions
if no_planets:
im_pl = 0
else:
_log.info(' gen_slope_image: Creating companion image...')
im_pl = self.gen_planets_image(wfe_drift=wfe_drift0, **kwargs2)
# Extended disk structures
if no_disk:
im_disk = 0
else:
_log.info(' gen_slope_image: Creating disk image...')
im_disk = self.gen_disk_image(apply_distortions=apply_distortions, **kwargs2)
# Zodiacal bg levels
_log.info(' gen_slope_image: Creating zodiacal background image...')
if kwargs['use_cmask']:
fzodi = self.bg_zodi_image(zfact=zfact, **kwargs)
else:
fzodi = np.ones([ypix,xpix]) * self.bg_zodi(zfact=zfact, **kwargs)
if oversample!=1:
fzodi = frebin(fzodi, scale=oversample)
# Combine components
im_final_over = im_star + im_disk + im_pl + fzodi
# Noise per detector pixel
if not exclude_noise:
_log.info(' gen_slope_image: Adding noise...')
# Get to detector sampled pixels
im_final = frebin(im_final_over, scale=1/oversample)
# For each pixel, how many groups until saturation?
ng_sat = sat_val * self.well_level / (im_final * det.time_group)
# Cap ng_sat to ngroup
ngroup = det.multiaccum.ngroup
ng_sat[ng_sat > ngroup] = ngroup
ng_sat = ng_sat.astype('int')
im_noise = det.pixel_noise(fsrc=im_final, ng=ng_sat, **kwargs)
# Fix any values due to ng<1
ind_fix = (np.isnan(im_noise)) | (ng_sat < 1)
if np.sum(ind_fix)>0:
im_noise[ind_fix] = det.pixel_noise(fsrc=im_final[ind_fix], ng=1, nf=1, **kwargs)
im_noise_over = np.sqrt(frebin(im_noise**2, scale=oversample))
# Add random Gaussian noise
im_final_over += np.random.normal(scale=im_noise_over)
# Get rid of disk and planet emission
# while keeping their noise contributions
if exclude_disk: im_final_over -= im_disk
if exclude_planets: im_final_over -= im_pl
if return_oversample and oversample>1:
if kipc is not None:
_log.warning(' gen_slope_image: Cannot apply IPC to image with oversample>1.')
if kppc is not None:
_log.warning(' gen_slope_image: Cannot apply PPC to image with oversample>1.')
return im_final_over
else:
im_final = frebin(im_final_over, scale=1/oversample)
if kipc is not None:
im_final = add_ipc(im_final, kernel=kipc)
if kppc is not None:
nchans = 4 if 'FULL' in self.det_info['wind_mode'] else 1
im_final = add_ppc(im_final, kernel=kppc, nchans=nchans)
return im_final
[docs]
def star_flux(self, fluxunit='counts', do_ref=False, sp=None):
""" Stellar flux
Return the stellar flux in synphot-supported units, such as
vegamag, counts, or Jy.
Parameters
----------
fluxunits : str
Desired output units, such as counts, vegamag, Jy, etc.
Must be a synphot supported unit string.
sp : :class:`webbpsf_ext.synphot_ext.Spectrum`
Normalized synphot spectrum.
"""
from webbpsf_ext.synphot_ext import Observation
if sp is None:
sp = self.sp_ref if do_ref else self.sp_sci
if sp is None:
raise ValueError('Stellar spectrum is undefined.')
# Create synphot observation
bp = self.bandpass
obs = Observation(sp, bp, binset=bp.wave)
return obs.effstim(fluxunit)
[docs]
def planet_flux(self, fluxunit='counts'):
""" Return planet fluxes """
if len(self.planets)==0:
_log.info("No planet info at self.planets")
return 0.0
flux_vals = []
for pl in self.planets :
# Create spectrum
sp = self.planet_spec(**pl)
flux = self.star_flux(fluxunit=fluxunit, sp=sp)
flux_vals.append(flux)
return flux_vals
def _fix_sat_im(self, image, sat_val=0.9, oversample=1, **kwargs):
"""Fix saturated region of an image
Parameters
----------
image : ndarray
Image to clean.
sav_val : float
Well level fraction to considered saturated.
Keyword Args
------------
full_size : bool
Expand (or contract) to size of detector array?
If False, returned image is fov_pix size.
ngroup : int
How many group times to determine saturation level?
If this number is higher than the total groups in ramp,
then a warning is produced. The default is ngroup=2,
A value of 0 corresponds to the so-called "zero-frame,"
which is the very first frame that is read-out and saved
separately. This is the equivalent to ngroup=1 for RAPID
and BRIGHT1 observations.
do_ref : bool
Get saturation levels assuming reference observation.
niter_max : int
Number of iterations for fixing NaNs. Default=5.
"""
# Account for possible oversampling
if oversample>1:
image_det = frebin(image, scale=1/oversample)
sat_level = self.saturation_levels(image=image_det, **kwargs)
sat_level = frebin(sat_level, scale=oversample, total=False)
else:
sat_level = self.saturation_levels(image=image, **kwargs)
sat_mask = sat_level > sat_val
image[sat_mask] = np.nan
image = fix_nans_with_med(image, **kwargs)
# If there are any leftover NaNs, make them 0.
nan_mask = np.isnan(image)
image[nan_mask] = 0
return image
[docs]
def gen_roll_image(self, PA1=-5, PA2=+5, return_oversample=False,
no_ref=False, opt_diff=False, ref_scale_all=False,
wfe_drift0=0, wfe_ref_drift=None, wfe_roll_drift=None,
xyoff_roll1=None, xyoff_roll2=None, xyoff_ref=None,
interp=None, do_sat=False, sat_val=0.95, use_ap_info=None, **kwargs):
"""Make roll-subtracted image.
Create a final roll-subtracted slope image based on current observation
settings. Coordinate convention is for N up and E to left.
Procedure:
- Create Roll 1 and Roll 2 slope images (star+exoplanets+disk)
- Create Reference Star slope image
- Add noise to all images
- Scale ref image
- Subtract ref image from both rolls
- De-rotate Roll 1 and Roll 2 to common coordinates
- Average Roll 1 and Roll 2
Returns an HDUList of final image (N rotated upwards).
Parameters
----------
PA1 : float
Position angle of first telescope roll position.
This is counter-clockwise (West to East).
PA2 : float, None
Position angle of second roll position. If set equal to PA1
(or to None), then only one roll will be performed.
Otherwise, two rolls are performed, each using the specified
MULTIACCUM settings (doubling the effective exposure time).
no_ref : bool
Exclude reference observation. Subtraction is then Roll1-Roll2.
opt_diff : bool
Optimal reference differencing (scaling only on the inner regions)
ref_scale_all : bool
Normally we just use the science and reference PSFs to calculate
scaling. However, if there is an unresolved companion or disk
emission close to the star, then we won't get the correct scale
factor for optimal reference subtraction. Instead, this option
inludes disk and companions for calculating the reference scale
factor.
wfe_drift0 : float
Initial RMS WFE drift value of First PSF. Roll2 and Ref observations
will be incremented from this. Default is 0.
wfe_ref_drift : float
WFE drift between sci and ref observations (nm RMS).
wfe_roll_drift : float
WFE drift between Roll1 and Roll2 observations (nm RMS).
return_oversample : bool
Return either the detector pixel-sampled or oversampled image.
xyoff_roll1 : tuple or None
Explicitly set pointing offset for Roll 1 (arcsec)
xyoff_roll2 : tuple or None
Explicitly set pointing offset for Roll 2 (arcsec)
xyoff_ref : tuple or None
Explicitly set pointing offset for Reference (arcsec)
do_sat : bool
Determine saturated regions and NaN them.
sat_val : float
Fraction of full well to consider as saturated within two groups.
Keyword Args
------------
exclude_disk : bool
Do not include disk in final image (for radial contrast),
but still add Poisson noise from disk.
exclude_planets : bool
Do not include planets in final image (for radial contrast),
but still add Poisson noise from disk.
exclude_noise : bool
Don't add random Gaussian noise (detector+photon)
use_coeff : bool
If True, uses `calc_psf_from_coeff`, other STPSF's built-in `calc_psf`
for stellar sources.
psf_corr_over : ndarray
PSF correction factor for oversampled PSF. Used to better match simulated
PSFs to observed PSFs.
diffusion_sigma : float
Apply pixel diffusion to PSF. Value is in units of detector pixels.
kipc : ndarray
Kernel for IPC. If not provided, then IPC is not applied.
kppc : ndarray
Kernel for PPC. If not provided, then PPC is not applied.
ideal_Poisson : bool
If set to True, use total signal for noise estimate,
otherwise MULTIACCUM equation is used.
use_cmask : bool
Use the coronagraphic mask image to attenuate planets or disk
obscurred by a corongraphic mask feature. Default is True.
zfact : float
Zodiacal background factor (default=2.5)
ra : float
Right ascension in decimal degrees
dec : float
Declination in decimal degrees
thisday : int
Calendar day to use for background calculation. If not given, will use the
average of visible calendar days.
shift_func : func
Function to use for shifting image. Typical options are `fshift` and
`fourier_imshift`.
interp : str
Interpolation used for `fshift`, either 'linear' or 'cubic'.
"""
# Final image shape
xpix, ypix = (self.det_info['xpix'], self.det_info['ypix'])
oversample = self.oversample
pixscale_over = self.pixelscale / oversample
if return_oversample:
pixscale_out = pixscale_over
osamp_out = oversample
else:
pixscale_out = self.pixelscale
osamp_out = 1
# Default to use coronagraph mask ND square and holder attenuation
use_cmask = kwargs.get('use_cmask')
if use_cmask is None:
kwargs['use_cmask'] = True
# Sub-image for determining ref star scale factor
subsize = 50 * oversample
xpix_over = xpix * oversample
ypix_over = ypix * oversample
xsub = np.min([subsize, xpix_over])
ysub = np.min([subsize, ypix_over])
sub_shape = (ysub, xsub)
# Option to override wfe_ref_drift and wfe_roll_drift
wfe_ref_drift = self.wfe_ref_drift if wfe_ref_drift is None else wfe_ref_drift
wfe_roll_drift = self.wfe_roll_drift if wfe_roll_drift is None else wfe_roll_drift
# Position angle decisions
if PA2 is None:
roll_angle = 0
else:
roll_angle = PA2 - PA1
# Bar offset in arcsec
bar_offset = self.bar_offset
bar_offpix = bar_offset / self.pixelscale
# Additional field offsets
xyoff_asec1 = self.pointing_info.get('roll1', (0,0)) if xyoff_roll1 is None else xyoff_roll1
xyoff_asec2 = self.pointing_info.get('roll2', (0,0)) if xyoff_roll2 is None else xyoff_roll2
xyoff_asec_ref = self.pointing_info.get('ref', (0,0)) if xyoff_ref is None else xyoff_ref
xyoff_asec1 = np.asarray(xyoff_asec1)
xyoff_asec2 = np.asarray(xyoff_asec2)
xyoff_asec_ref = np.asarray(xyoff_asec_ref)
if len(xyoff_asec_ref.shape)>1:
xyoff_asec_ref = xyoff_asec_ref[0]
##################################
# Generate Roll1 Image
##################################
# Add in offsets for PSF generation
# bar offsets are added inside gen_offset_psf during calc_psf_from_coeff
offx_asec, offy_asec = xyoff_asec1
r, th = xy_to_rtheta(offx_asec, offy_asec)
# Create stellar PSF centered in image
im_star = self.gen_offset_psf(r, th, sp=self.sp_sci, return_oversample=True,
wfe_drift=wfe_drift0, **kwargs)
# Stellar cut-out for reference scaling
im_star_sub = crop_image(im_star, sub_shape)
# Expand to full size
# - Not needed because this is correctly handled in gen_slope_image
# - Run into undesired cropping effects if PSF is larger than FoV and significantly shifted
# im_star = pad_or_cut_to_size(im_star, (ypix_over, xpix_over))
###################################
# Get image shifts to mask location
###################################
# Shift to position relative to center of image
use_ap_info = self._use_ap_info if use_ap_info is None else use_ap_info
kwargs['use_ap_info'] = use_ap_info
if (('FULL' in self.det_info['wind_mode']) and (self.image_mask is not None)) or use_ap_info:
xcen, ycen = (self.siaf_ap.XSciRef - 1, self.siaf_ap.YSciRef - 1)
delx_pix = (xcen - (xpix/2. - 0.5)) # 'sci' pixel shifts
dely_pix = (ycen - (ypix/2. - 0.5)) # 'sci' pixel shifts
delx_asec, dely_asec = np.array([delx_pix, dely_pix]) * self.pixelscale
xcen_baroff = xcen # Use SIAF aperture location
else:
# Otherwise assumed mask is in center of subarray for simplicity
# For even arrays, the pixel is then centered on the boundary between two pixels
xcen, ycen = (xpix/2. - 0.5, ypix/2. - 0.5)
xcen_baroff = xcen + bar_offpix # Include bar offset position
# Add bar offset
xcen += bar_offpix
delx_pix, dely_pix = (bar_offpix, 0)
delx_asec, dely_asec = np.array([delx_pix, dely_pix]) * self.pixelscale
# Create cen_over parameter to pass to de-rotate function
xcen_over, ycen_over = oversampled_coords(np.array([xcen_baroff, ycen]), oversample)
cen_over = (xcen_over, ycen_over)
# Perform shift and create slope image
if interp is None:
interp = 'linear' if ('FULL' in self.det_info['wind_mode']) else 'cubic'
# print('Roll1...', PA1, xyoff_asec1, interp)
im_roll1 = self.gen_slope_image(PA=PA1, xyoff_asec=xyoff_asec1, im_star=im_star,
return_oversample=True, interp=interp, **kwargs)
if no_ref and (roll_angle==0):
_log.warning('If no_ref=True, then PA1 must not equal PA2. Setting no_ref=False')
no_ref = False
# Include disk and companion flux for calculating the reference scale factor
# Use summed roll image; shift star to center of array and crop
if ref_scale_all:
# Get dither offsets that exist within im_roll1
offx_asec, offy_asec = xyoff_asec1
delx_asec_dith = delx_asec + offx_asec
dely_asec_dith = dely_asec + offy_asec
delx_over, dely_over = np.array([delx_asec_dith, dely_asec_dith]) / pixscale_over
# Shift image back to center and crop star+disk+planets image
im_star_sub = crop_image(im_star, sub_shape, delx=-1*delx_over, dely=-1*dely_over,
interp=interp, shift_func=fshift)
##################################################
# Pure roll subtraction (no reference PSF)
##################################################
if no_ref:
# Create Roll2 image
if (np.abs(wfe_roll_drift) < eps) and np.allclose(xyoff_asec1, xyoff_asec2):
im_star2 = im_star
else:
im_star2 = None
# print('Roll2...', PA2, xyoff_asec2, interp)
im_roll2 = self.gen_slope_image(PA=PA2, xyoff_asec=xyoff_asec2, im_star=im_star2, do_roll2=True,
wfe_drift0=wfe_drift0, wfe_roll_drift=wfe_roll_drift,
return_oversample=True, interp=interp, **kwargs)
# if oversample>1:
# kernel = Gaussian2DKernel(0.5*oversample)
# im_roll1 = convolve_fft(im_roll1, kernel, allow_huge=True)
# im_roll2 = convolve_fft(im_roll2, kernel, allow_huge=True)
# Shift roll images by dither/pointing offsets to align and subtract
dx, dy = -1 * xyoff_asec1 / pixscale_over
im_roll1_sh = fshift(im_roll1, delx=dx, dely=dy, interp=interp)
dx, dy = -1 * xyoff_asec2 / pixscale_over
im_roll2_sh = fshift(im_roll2, delx=dx, dely=dy, interp=interp)
# Flag saturated pixels in each roll image
# Only consider saturation from stellar signal
if do_sat:
# Images are oversampled, so need to rebin to detector pixels
imstar_rebin = frebin(im_star, scale=1/oversample)
sat_rebin = self.saturation_levels(image=imstar_rebin, **kwargs)
# Rebin back to oversampled size
sat_over = frebin(sat_rebin, scale=oversample, total=False)
sat_over = crop_image(sat_over, im_roll1_sh.shape, fill_val=0)
# Flag saturated pixels
im_roll1_sh[sat_over > sat_val] = np.nan
im_roll2_sh[sat_over > sat_val] = np.nan
del imstar_rebin, sat_rebin, sat_over
# Difference the two rolls
diff_r1 = im_roll1_sh - im_roll2_sh
diff_r2 = -1 * diff_r1
# De-rotate each image around center of mask
diff_r1_rot = rotate_offset(diff_r1, -PA1, cen=cen_over, reshape=True, cval=np.nan)
diff_r2_rot = rotate_offset(diff_r2, -PA2, cen=cen_over, reshape=True, cval=np.nan)
# Expand to the same size
new_shape = tuple(np.max(np.array([diff_r1_rot.shape, diff_r2_rot.shape]), axis=0))
diff_r1_rot = crop_image(diff_r1_rot, new_shape, fill_val=np.nan)
diff_r2_rot = crop_image(diff_r2_rot, new_shape, fill_val=np.nan)
# Replace NaNs with values from other differenced mask
final = np.nanmean([diff_r1_rot, diff_r2_rot], axis=0)
# Rebin if requesting detector sampled images
if not return_oversample:
final = frebin(final, scale=1/oversample)
im_roll1_sh = frebin(im_roll1_sh, scale=1/oversample)
im_roll2_sh = frebin(im_roll2_sh, scale=1/oversample)
hdu = fits.PrimaryHDU(final)
hdu.header['EXTNAME'] = ('ROLL_SUB')
hdu.header['OVERSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['OSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['PIXELSCL'] = (pixscale_out, 'Image pixel scale (asec/pix)')
hdu.header['FILTER'] = (self.filter, 'Filter name')
if self.is_lyot:
hdu.header['PUPIL'] = (self.pupil_mask, 'Pupil plane mask')
if self.is_coron:
hdu.header['CORONMSK'] = (self.image_mask, 'Image plane mask')
hdu.header['TEXP_SCI'] = (2*self.multiaccum_times['t_exp'], 'Total science exposure time (sec)')
hdu.header['TEXP_REF'] = (0, 'Total reference exposure time (sec)')
hdu.header['ROLL_ANG'] = (roll_angle, 'Delta roll angle (deg)')
hdu.header['WFE_ROLL'] = (wfe_roll_drift, 'WFE drift between Roll1 and Roll2 (nm RMS)')
hdulist = fits.HDUList([hdu])
roll_names = ['ROLL1', 'ROLL2']
pa_vals = [PA1, PA2]
for ii, im in enumerate([im_roll1_sh, im_roll2_sh]):
hdu = fits.ImageHDU(im, name=roll_names[ii])
# hdu.header['EXTNAME'] = (roll_names[ii])
hdu.header['OVERSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['OSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['PIXELSCL'] = (pixscale_out, 'Image pixel scale (asec/pix)')
hdu.header['FILTER'] = (self.filter, 'Filter name')
if self.is_lyot:
hdu.header['PUPIL'] = (self.pupil_mask, 'Pupil plane mask')
if self.is_coron:
hdu.header['CORONMSK'] = (self.image_mask, 'Image plane mask')
hdu.header['TEXP'] = (self.Detector.time_exp, 'Total exposure time (sec)')
hdu.header['PA'] = (pa_vals[ii], "Position angle (deg)")
dx, dy = xyoff_asec1 if ii==0 else xyoff_asec2
hdu.header['DX_ASEC'] = (dx, 'Pointing offset in x-ideal (asec)')
hdu.header['DY_ASEC'] = (dy, 'Pointing offset in y-ideal (asec)')
hdulist.append(hdu)
return hdulist
##################################################
# Continuing with a ref PSF subtraction algorithm
##################################################
# Reference star slope simulation
wfe_drift_ref = wfe_drift0 + wfe_ref_drift
# Add in offsets for PSF generation
# bar offsets are added inside gen_offset_psf during calc_psf_from_coeff
offx_asec, offy_asec = xyoff_asec_ref
r, th = xy_to_rtheta(offx_asec, offy_asec)
# Create stellar PSF centered in image
im_star_ref = self.gen_offset_psf(r, th, sp=self.sp_ref, return_oversample=True,
wfe_drift=wfe_drift_ref, **kwargs)
# Stellar cut-out for reference scaling
im_ref_sub = crop_image(im_star_ref, sub_shape)
# Expand to full size
# - Not needed because this is correctly handled in gen_slope_image
# - Run into undesired cropping effects if PSF is larger than FoV and significantly shifted
# im_ref = pad_or_cut_to_size(im_ref, (ypix_over, xpix_over))
# Create Reference slope image
# Essentially just adds image shifts and noise
# print('Ref...', 0, xyoff_asec_ref, interp)
im_ref = self.gen_slope_image(PA=0, xyoff_asec=xyoff_asec_ref, im_star=im_star_ref,
do_ref=True, return_oversample=True, interp=interp, **kwargs)
# Determine reference star scale factor
scale1 = scale_ref_image(im_star_sub, im_ref_sub)
_log.debug(f'scale1: {scale1:.3f}')
# print(f'scale1: {scale1:.3f}')
# print('scale1: {0:.3f}'.format(scale1), im_star_sub.sum(), im_ref_sub.sum())
# if oversample>1:
# kernel = Gaussian2DKernel(0.5*oversample)
# im_ref = convolve_fft(im_ref, kernel, allow_huge=True)
# im_roll1 = convolve_fft(im_roll1, kernel, allow_huge=True)
# Shift roll images by pointing offsets
dx, dy = -1 * xyoff_asec1 / pixscale_over
im_roll1_sh = fshift(im_roll1, delx=dx, dely=dy, interp=interp)
dx, dy = -1 * xyoff_asec_ref / pixscale_over
im_ref_sh = fshift(im_ref, delx=dx, dely=dy, interp=interp)
# Flag saturated pixels in each roll image
# Only consider saturation from stellar signal
if do_sat:
# Images are oversampled, so need to rebin to detector pixels
imstar_rebin = frebin(im_star, scale=1/oversample)
sat_rebin = self.saturation_levels(image=imstar_rebin, **kwargs)
# Rebin back to oversampled size
sat_over = frebin(sat_rebin, scale=oversample, total=False)
sat_over = crop_image(sat_over, im_roll1_sh.shape, fill_val=0)
# Flag saturated pixels
im_roll1_sh[sat_over > sat_val] = np.nan
del imstar_rebin, sat_rebin, sat_over
# Images are oversampled, so need to rebin to detector pixels
imstar_rebin = frebin(im_star_ref, scale=1/oversample)
sat_rebin = self.saturation_levels(image=imstar_rebin, do_ref=True, **kwargs)
# Rebin back to oversampled size
sat_over = frebin(sat_rebin, scale=oversample, total=False)
sat_over = crop_image(sat_over, im_ref_sh.shape, fill_val=0)
# Flag saturated pixels
im_ref_sh[sat_over > sat_val] = np.nan
del imstar_rebin, sat_rebin, sat_over
# Telescope Roll 2 with reference subtraction
if (abs(roll_angle) > eps):
# Subtraction with and without scaling
im_diff1_r1 = im_roll1_sh - im_ref_sh
im_diff2_r1 = im_roll1_sh - im_ref_sh * scale1
# WFE drift difference between rolls
wfe_drift1 = wfe_drift0
wfe_drift2 = wfe_drift1 + wfe_roll_drift
# Create roll2 image
if (np.abs(wfe_roll_drift) < eps) and np.allclose(xyoff_asec1, xyoff_asec2):
# Assume Roll2 and Roll1 have exactly the same position offset and WFE drift
im_star2 = im_star
im_star2_sub = im_star_sub
else:
##################################
# Generate Roll2 Image
# Add in offsets for PSF generation
# bar offsets are added inside gen_offset_psf during calc_psf_from_coeff
offx_asec, offy_asec = xyoff_asec2
r, th = xy_to_rtheta(offx_asec, offy_asec)
# Create stellar PSF (Roll 2) centered in image
im_star2 = self.gen_offset_psf(r, th, sp=self.sp_sci, return_oversample=True,
wfe_drift=wfe_drift2, **kwargs)
im_star2_sub = crop_image(im_star2, sub_shape)
im_star2 = crop_image(im_star2, (ypix_over, xpix_over))
# Create Roll2 slope image
# print('Roll2...', PA2, xyoff_asec2, interp)
im_roll2 = self.gen_slope_image(PA=PA2, xyoff_asec=xyoff_asec2, im_star=im_star2,
do_roll2=True, return_oversample=True, interp=interp, **kwargs)
# Include disk and companion flux for calculating the reference scale factor
if ref_scale_all:
# Get dither offsets
offx_asec, offy_asec = xyoff_asec2
delx_asec_dith = delx_asec + offx_asec
dely_asec_dith = dely_asec + offy_asec
delx_over, dely_over = np.array([delx_asec_dith, dely_asec_dith]) / pixscale_over
# Shift and crop star+disk+planets image
im_star2_sub = crop_image(im_roll2, sub_shape, delx=-1*delx_over, dely=-1*dely_over,
interp=interp, shift_func=fshift)
# Subtract reference star from Roll 2
scale2 = scale_ref_image(im_star2_sub, im_ref_sub)
_log.debug(f'scale2: {scale2:.3f}')
# print(f'scale2: {scale2:.3f}')
# print('scale2: {0:.3f}'.format(scale2), im_star2_sub.sum(), im_ref_sub.sum())
# if oversample>1:
# kernel = Gaussian2DKernel(0.5*oversample)
# im_roll2 = convolve_fft(im_roll2, kernel, allow_huge=True)
# Shift roll images by pointing offsets
dx, dy = -1 * xyoff_asec2 / pixscale_over
im_roll2_sh = fshift(im_roll2, delx=dx, dely=dy, interp=interp)
# Flag saturated pixels in each roll image
# Only consider saturation from stellar signal
if do_sat:
# Images are oversampled, so need to rebin to detector pixels
imstar_rebin = frebin(im_star, scale=1/oversample)
sat_rebin = self.saturation_levels(image=imstar_rebin, **kwargs)
# Rebin back to oversampled size
sat_over = frebin(sat_rebin, scale=oversample, total=False)
sat_over = crop_image(sat_over, im_roll2_sh.shape, fill_val=0)
# Flag saturated pixels
im_roll2_sh[sat_over > sat_val] = np.nan
del imstar_rebin, sat_rebin, sat_over
# Subtraction with and without scaling
im_diff1_r2 = im_roll2_sh - im_ref_sh
im_diff2_r2 = im_roll2_sh - im_ref_sh * scale2
# De-rotate each image
diff1_r1_rot = rotate_offset(im_diff1_r1, -PA1, cen=cen_over, reshape=True, cval=np.nan)
diff2_r1_rot = rotate_offset(im_diff2_r1, -PA1, cen=cen_over, reshape=True, cval=np.nan)
diff1_r2_rot = rotate_offset(im_diff1_r2, -PA2, cen=cen_over, reshape=True, cval=np.nan)
diff2_r2_rot = rotate_offset(im_diff2_r2, -PA2, cen=cen_over, reshape=True, cval=np.nan)
# Expand all images to the same size
new_shape = tuple(np.max(np.array([diff1_r1_rot.shape, diff1_r2_rot.shape]), axis=0))
diff1_r1_rot = crop_image(diff1_r1_rot, new_shape, fill_val=np.nan)
diff2_r1_rot = crop_image(diff2_r1_rot, new_shape, fill_val=np.nan)
diff1_r2_rot = crop_image(diff1_r2_rot, new_shape, fill_val=np.nan)
diff2_r2_rot = crop_image(diff2_r2_rot, new_shape, fill_val=np.nan)
# Replace NaNs with values from other differenced mask
# final1 has better noise in outer regions (background)
# final2 has better noise in inner regions (PSF removal)
final1 = np.nanmean([diff1_r1_rot, diff1_r2_rot], axis=0)
final2 = np.nanmean([diff2_r1_rot, diff2_r2_rot], axis=0)
if opt_diff:
rho = dist_image(final1)
binsize = 1
bins = np.arange(rho.min(), rho.max() + binsize, binsize)
nan_mask = np.isnan(final1) | np.isnan(final2)
igroups, _, rr = hist_indices(rho[~nan_mask], bins, True)
func_std = np.std #robust.medabsdev
std1 = binned_statistic(igroups, final1[~nan_mask], func=func_std)
std2 = binned_statistic(igroups, final2[~nan_mask], func=func_std)
ibin_better1 = np.where(std1 < std2)[0]
ibin_better2 = np.where(std2 < std1)[0]
if len(ibin_better1) < len(ibin_better2):
# Get all pixel indices
if len(ibin_better1)>0:
ind_all = np.array([item for ibin in ibin_better1 for item in igroups[ibin]])
ind_all.sort()
temp = final2[~nan_mask]
temp[ind_all] = final1[~nan_mask][ind_all]
final2[~nan_mask] = temp
final = final2
else:
if len(ibin_better2)>0:
ind_all = np.array([item for ibin in ibin_better2 for item in igroups[ibin]])
ind_all.sort()
temp = final1[~nan_mask]
temp[ind_all] = final2[~nan_mask][ind_all]
final1[~nan_mask] = temp
final = final1
else:
# Choose version that optmizes PSF subtraction
final = final2
texp_sci = 2 * self.multiaccum_times['t_exp']
# For only a single roll
else:
# Optimal differencing (with scaling only on the inner regions)
if opt_diff:
final = optimal_difference(im_roll1_sh, im_ref_sh, scale1)
else:
final = im_roll1_sh - im_ref_sh * scale1
final = rotate_offset(final, -PA1, cen=cen_over, reshape=True, cval=np.nan)
texp_sci = self.multiaccum_times['t_exp']
# Rebin if requesting detector sampled images
if not return_oversample:
final = frebin(final, scale=1/oversample)
im_roll1 = frebin(im_roll1, scale=1/oversample)
hdu = fits.PrimaryHDU(final)
hdu.header['EXTNAME'] = ('REF_SUB')
hdu.header['OVERSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['OSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['PIXELSCL'] = (pixscale_out, 'Image pixel scale (asec/pix)')
hdu.header['FILTER'] = (self.filter, 'Filter name')
if self.is_lyot:
hdu.header['PUPIL'] = (self.pupil_mask, 'Pupil plane mask')
if self.is_coron:
hdu.header['CORONMSK'] = (self.image_mask, 'Image plane mask')
hdu.header['TEXP_SCI'] = (texp_sci, 'Total science exposure time (sec)')
hdu.header['TEXP_REF'] = (self.Detector_ref.time_exp, 'Total reference exposure time (sec)')
hdu.header['ROLL_ANG'] = (roll_angle, 'Delta roll angle (deg)')
if roll_angle != 0:
hdu.header['WFE_ROLL'] = (wfe_roll_drift, 'WFE drift between Roll1 and Roll2 (nm RMS)')
if not no_ref:
hdu.header['WFE_REF'] = (wfe_ref_drift, 'WFE drift between Roll1 and Reference (nm RMS)')
hdulist = fits.HDUList([hdu])
# Add Roll1
hdu = fits.ImageHDU(im_roll1, name='ROLL1')
# hdu.header['EXTNAME'] = ('ROLL1')
hdu.header['OVERSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['OSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['PIXELSCL'] = (pixscale_out, 'Image pixel scale (asec/pix)')
hdu.header['FILTER'] = (self.filter, 'Filter name')
if self.is_lyot:
hdu.header['PUPIL'] = (self.pupil_mask, 'Pupil plane mask')
if self.is_coron:
hdu.header['CORONMSK'] = (self.image_mask, 'Image plane mask')
hdu.header['TEXP'] = (self.Detector.time_exp, 'Total exposure time (sec)')
hdu.header['PA'] = (PA1, "Position angle (deg)")
hdu.header['DX_ASEC'] = (xyoff_asec1[0], 'Pointing offset in x-ideal (asec)')
hdu.header['DY_ASEC'] = (xyoff_asec1[1], 'Pointing offset in y-ideal (asec)')
hdu.header['SCALEREF'] = (scale1, 'Reference star scale factor')
hdulist.append(hdu)
# Add Roll2
try:
if not return_oversample:
im_roll2 = frebin(im_roll2, scale=1/oversample)
hdu = fits.ImageHDU(im_roll2, name='ROLL2')
# hdu.header['EXTNAME'] = ('ROLL2')
hdu.header['OVERSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['OSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['PIXELSCL'] = (pixscale_out, 'Image pixel scale (asec/pix)')
hdu.header['FILTER'] = (self.filter, 'Filter name')
if self.is_lyot:
hdu.header['PUPIL'] = (self.pupil_mask, 'Pupil plane mask')
if self.is_coron:
hdu.header['CORONMSK'] = (self.image_mask, 'Image plane mask')
hdu.header['TEXP'] = (self.Detector.time_exp, 'Total exposure time (sec)')
hdu.header['PA'] = (PA2, "Position angle (deg)")
hdu.header['DX_ASEC'] = (xyoff_asec2[0], 'Pointing offset in x-ideal (asec)')
hdu.header['DY_ASEC'] = (xyoff_asec2[1], 'Pointing offset in y-ideal (asec)')
hdu.header['SCALEREF'] = (scale2, 'Reference star scale factor')
hdulist.append(hdu)
except:
pass
# Add Ref image
try:
if not return_oversample:
im_ref = frebin(im_ref, scale=1/oversample)
hdu = fits.ImageHDU(im_ref, name='REF')
# hdu.header['EXTNAME'] = ('REF')
hdu.header['OVERSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['OSAMP'] = (osamp_out, 'Oversample compared to detector pixels')
hdu.header['PIXELSCL'] = (pixscale_out, 'Image pixel scale (asec/pix)')
hdu.header['FILTER'] = (self.filter, 'Filter name')
if self.is_lyot:
hdu.header['PUPIL'] = (self.pupil_mask, 'Pupil plane mask')
if self.is_coron:
hdu.header['CORONMSK'] = (self.image_mask, 'Image plane mask')
hdu.header['TEXP'] = (self.Detector_ref.time_exp, 'Total exposure time (sec)')
hdu.header['PA'] = (0, "Position angle (deg)")
hdu.header['DX_ASEC'] = (xyoff_asec_ref[0], 'Pointing offset in x-ideal (asec)')
hdu.header['DY_ASEC'] = (xyoff_asec_ref[1], 'Pointing offset in y-ideal (asec)')
hdulist.append(hdu)
except:
pass
return hdulist
[docs]
def calc_contrast(self, hdu_diff=None, roll_angle=10, nsig=1,
exclude_disk=True, exclude_planets=True, no_ref=False,
wfe_drift0=0, wfe_ref_drift=None, wfe_roll_drift=None,
func_std=np.std, **kwargs):
"""Create contrast curve.
Generate n-sigma contrast curve for the current observation settings.
Make sure that MULTIACCUM parameters are set for both the main
class (``self.update_detectors()``) as well as the reference target
class (``self.update_detectors_ref()``).
Parameters
----------
hdu_diff : HDUList
Option to pass an already pre-made differenced image.
roll_angle : float
Telescope roll angle (deg) between two observations.
If set to 0 or None, then only one roll will be performed.
If value is >0, then two rolls are performed, each using the
specified MULTIACCUM settings (doubling the effective exposure
time).
nsig : float
n-sigma contrast curve.
exclude_disk : bool
Ignore disk when generating image?
exclude_planets : bool
Ignore planets when generating image?
no_ref : bool
Exclude reference observation. Subtraction is then Roll1-Roll2.
func_std : func
The function to use for calculating the radial standard deviation.
Keyword Args
------------
zfact : float
Zodiacal background factor (default=2.5)
psf_corr_over : ndarray
PSF correction factor for oversampled PSF. Used to better match simulated
PSFs to observed PSFs.
diffusion_sigma : float
Apply pixel diffusion to PSF. Value is in units of detector pixels.
kipc : ndarray
Kernel for IPC. If not provided, then IPC is not applied.
kppc : ndarray
Kernel for PPC. If not provided, then PPC is not applied.
exclude_noise : bool
Don't add random Gaussian noise (detector+photon)?
ideal_Poisson : bool
If set to True, use total signal for noise estimate,
otherwise MULTIACCUM equation is used.
use_cmask : bool
Include coronagraphic mask elements to attenuate background, disk, companions?
opt_diff : bool
Optimal reference differencing (scaling only on the inner regions)
smooth : bool
Smooth the result by convolving with a Gaussian that has stddev=1
Default: True.
small_numbers : bool
Account for small number statistics? Default: True.
Returns
-------
tuple
Three arrays in a tuple: the radius in arcsec, n-sigma contrast,
and n-sigma magnitude sensitivity limit (vega mag).
"""
from webbpsf_ext.webbpsf_ext_core import _nrc_coron_psf_sums, nrc_mask_trans
if no_ref and (roll_angle==0):
_log.warning('If no_ref=True, roll_angle must not equal 0. Setting no_ref=False')
no_ref = False
# If no HDUList is passed, then create one
if hdu_diff is None:
roll_angle = 0 if roll_angle is None else roll_angle
if abs(roll_angle) < eps:
PA1, PA2 = (0, None)
else:
PA1, PA2 = np.array([-0.5, +0.5]) * roll_angle
hdu_diff = self.gen_roll_image(PA1=PA1, PA2=PA2, exclude_disk=exclude_disk,
exclude_planets=exclude_planets, no_ref=no_ref,
wfe_drift0=wfe_drift0, wfe_ref_drift=wfe_ref_drift,
wfe_roll_drift=wfe_roll_drift, **kwargs)
data = hdu_diff[0].data
header = hdu_diff[0].header
# Bin to detector-sampled data
xpix, ypix = (self.det_info['xpix'], self.det_info['ypix'])
pixscale = self.pixelscale
# Radial noise binned to detector pixels
rr, nstds = radial_std(data, pixscale=header['PIXELSCL'], oversample=header['OSAMP'],
supersample=False, func=func_std, nsig=nsig, **kwargs)
df_sig = kwargs.get('diffusion_sigma', None)
psf_corr_over = kwargs.get('psf_corr_over', None)
use_coeff = kwargs.get('use_coeff', True)
use_cmask = kwargs.get('use_cmask', True)
# Normalize by psf max value
if no_ref:
# No reference image subtraction; pure roll subtraction
# Generate 2 PSFs separated by roll angle to find self-subtracted PSF peak
# Store PSF max for later retrieval
try:
psf_sums_dict = self._psf_sums
except AttributeError:
psf_sums_dict = {}
self._psf_sums = psf_sums_dict
psf_max = psf_sums_dict.get('psf_max_rollsub', None)
if psf_max is None:
off_vals = []
max_vals = []
rvals_pix = np.insert(np.arange(1,xpix/2,5), 0, 0.1)
interp = 'linear' #if ('FULL' in self.det_info['wind_mode']) else 'cubic'
for roff_pix in rvals_pix:
roff_asec = roff_pix * pixscale
psf1 = self.gen_offset_psf(roff_asec, 0, return_oversample=False,
coron_rescale=True, diffusion_sigma=df_sig,
psf_corr_over=psf_corr_over,
use_coeff=use_coeff, use_cmask=use_cmask)
psf2 = self.gen_offset_psf(roff_asec, roll_angle, return_oversample=False,
coron_rescale=True, diffusion_sigma=df_sig,
psf_corr_over=psf_corr_over,
use_coeff=use_coeff, use_cmask=use_cmask)
psf1 = fshift(psf1, delx=0, dely=roff_pix, pad=False, interp=interp)
xoff, yoff = xy_rot(0, roff_pix, 10)
psf2 = fshift(psf2, delx=xoff, dely=yoff, pad=False, interp=interp)
diff = psf1 - psf2
maxv = np.max(diff)
off_vals.append(roff_pix)
max_vals.append(maxv)
if maxv >= 0.95*psf1.max():
off_vals = off_vals + [roff_pix+5, xpix/2]
max_vals = max_vals + [psf1.max(), psf1.max()]
break
max_vals = np.array(max_vals)
off_asec = np.array(off_vals) * pixscale
# Interpolate in log space
psf_max_log = np.interp(rr, off_asec, np.log10(max_vals))
psf_max = 10**psf_max_log
# Store in dictionary
psf_sums_dict['psf_max_rollsub'] = psf_max
elif not self.is_coron: # Direct imaging
psf = self.gen_offset_psf(0, 0, return_oversample=False,
diffusion_sigma=df_sig,
psf_corr_over=psf_corr_over,
use_coeff=use_coeff)
psf_max = psf.max()
elif self.image_mask[-1]=='R': # Round masks
ny, nx = (ypix, xpix)
yv = (np.arange(ny) - ny/2) * pixscale
xv = np.zeros_like(yv)
# Get mask transmission at selected points
trans = nrc_mask_trans(self.image_mask, xv, yv)
# Linear combination of min/max to determine PSF max value at given distance
# Get a and b values for each position
avals = trans**2
bvals = 1 - avals
# Init if _psf_sums dict doesn't exist
try:
_ = self._psf_sums['psf_off_max']
except:
_ = _nrc_coron_psf_sums(self, (0,0), 'idl', use_coeff=use_coeff)
psf_off_max = self._psf_sums['psf_off_max']
psf_cen_max = self._psf_sums['psf_cen_max']
# Linear combination
psf_max = avals * psf_off_max + bvals * psf_cen_max
# Interpolate values at rr locations
psf_max = 10**np.interp(rr, yv, np.log10(psf_max))
# Fix anything outside of bounds
if rr.max()>10:
psf_max[rr>10] = psf_max[(rr>4.5) & (rr<10)].max()
elif self.image_mask[-1]=='B': # Bar masks
# For off-axis PSF max values, use fiducial at bar_offset location
bar_offset = self.bar_offset
ny, nx = (ypix, xpix)
# Get mask transmission for grid of points
yv = (np.arange(ny) - ny/2) * pixscale
xv = np.ones_like(yv) * bar_offset
trans = nrc_mask_trans(self.image_mask, xv, yv)
# Linear combination of min/max to determine PSF max value at given distance
# Get a and b values for each position
avals = trans**2
bvals = 1 - avals
# Init if _psf_sums dict doesn't exist
try:
_ = self._psf_sums['psf_off_max']
except:
_ = _nrc_coron_psf_sums(self, (0,0), 'idl')
psf_off_max = self._psf_sums['psf_off_max']
# Linear interpolate psf center max value
xvals = self._psf_sums['psf_cen_xvals']
psf_cen_max_arr = self._psf_sums['psf_cen_max_arr']
finterp = interp1d(xvals, psf_cen_max_arr, kind='linear', fill_value='extrapolate')
psf_cen_max = finterp(bar_offset)
# Linear combination
psf_max = avals * psf_off_max + bvals * psf_cen_max
# Interpolate values at rr locations
psf_max = 10**np.interp(rr, yv, np.log10(psf_max))
# Fix anything outside of bounds
if rr.max()>10:
psf_max[rr>10] = np.max(psf_max[(rr>4.5) & (rr<10)])
# We also want to know the Poisson noise for the PSF values.
# For instance, even if psf_max is significantly above the
# background noise, Poisson noise could conspire to reduce
# the peak PSF value below the noise level.
# Count rate necessary to obtain some nsig
texp = self.multiaccum_times['t_exp']
p = 1 / texp
crate = (p*nsig**2 + np.sqrt((p*nsig**2)**2 + 4*nstds**2)) / 2
# Get total count rate
crate /= psf_max
# Compute contrast
contrast = crate / self.star_flux()
# Magnitude sensitivity
star_mag = self.star_flux('vegamag')
sen_mag = star_mag - 2.5*np.log10(contrast)
return (rr, contrast, sen_mag)
[docs]
def saturation_levels(self, ngroup=2, do_ref=False, image=None, charge_migration=True, **kwargs):
"""Saturation levels
Create image showing level of saturation for each pixel.
Saturation at different number of groups is possible with
ngroup keyword. Returns an array the same shape as `det_info`
[ypix,xpix] properties.
Parameters
----------
ngroup : int
How many group times to determine saturation level?
If this number is higher than the total groups in ramp,
then a warning is produced. The default is ngroup=2,
A value of 0 corresponds to the so-called "zero-frame,"
which is the very first frame that is read-out and saved
separately. This is the equivalent to ngroup=1 for RAPID
and BRIGHT1 observations.
do_ref : bool
Get saturation levels for reference source instead of science
image : ndarray
Rather than generating an image on the fly, pass a pre-computed
slope image.
charge_migration : bool
Include charge migration effects?
Keyword Args
------------
exclude_disk : bool
Do not include disk in final image (for radial contrast),
but still add Poisson noise from disk.
exclude_planets : bool
Do not include planets in final image (for radial contrast),
but still add Poisson noise from disk.
use_cmask : bool
Use the coronagraphic mask image to attenuate planet or disk that
is obscurred by a corongraphic mask feature.
satmax : float
Saturation value to limit charge migration. Default is 1.5.
niter : int
Number of iterations for charge migration. Default is 5.
corners : bool
Include corner pixels in charge migration? Default is True.
zfact : float
Zodiacal background factor (default=2.5)
ra : float
Right ascension in decimal degrees
dec : float
Declination in decimal degrees
thisday : int
Calendar day to use for background calculation. If not given, will use the
average of visible calendar days.
"""
assert ngroup >= 0
if do_ref:
det = self.Detector_ref
else:
det = self.Detector
ma = det.multiaccum
multiaccum_times = det.times_to_dict()
if ngroup > ma.ngroup:
_log.warning("Specified ngroup is greater than self.det_info['ngroup'].")
t_frame = multiaccum_times['t_frame']
if ngroup==0:
t_sat = t_frame
else:
nf = ma.nf; nd1 = ma.nd1; nd2 = ma.nd2
t_sat = (nd1 + ngroup*nf + (ngroup-1)*nd2) * t_frame
# Slope image of input source
if image is None:
image = self.gen_slope_image(do_ref=do_ref, exclude_noise=True, **kwargs)
# Well levels after "saturation time"
sat_level = image * t_sat / self.well_level
# Add in charge migration effects
if charge_migration:
sat_level = do_charge_migration(sat_level, **kwargs)
return sat_level
[docs]
def sensitivity(self, nsig=10, units=None, sp=None, verbose=False, do_ref=False, **kwargs):
# Use reference target detector config?
if do_ref:
det_temp = self.Detector
self.Detector = self.Detector_ref
if sp is None:
sp = self.sp_ref if do_ref else self.sp_sci
kwargs['nsig'] = nsig
kwargs['units'] = units
kwargs['sp'] = sp
kwargs['verbose'] = verbose
bglim = super().sensitivity(**kwargs)
# Return to original detector
if do_ref:
self.Detector = det_temp
return bglim
def _gen_disk_hdulist(inst, file, pixscale, dist, wavelength, units, cen_star,
sp_star=None, dist_out=None, shape_out=None,
remove_cen_flux=True, inner_pix=1, **kwargs):
"""Create a correctly scaled disk model image.
Rescale disk model flux to current pixel scale and distance.
If instrument bandpass is different from disk model, scales
flux assuming a grey scattering model.
Result (in photons/sec) is saved in self.disk_hdulist attribute.
"""
disk_params = {
'file' : file,
'pixscale' : pixscale,
'dist' : dist,
'wavelength' : wavelength,
'units' : units,
'cen_star' : cen_star,
}
oversample = inst.oversample
pixscale_out = inst.pixelscale / oversample
# TODO: Double-check 'APERNAME', 'DET_X', 'DET_Y', 'DET_V2', 'DET_V3'
# correspond to the desired observation
disk_hdul = make_disk_image(inst, disk_params, sp_star=sp_star, pixscale_out=pixscale_out,
dist_out=dist_out, shape_out=shape_out)
# Ensure disk image is only 2D
if len(disk_hdul[0].data.shape) > 2:
disk_hdul[0].data = disk_hdul[0].data[0]
# Get rid of the central star flux
# and anything interior to a few pixels
if remove_cen_flux:
image = disk_hdul[0].data
image_rho = dist_image(image)
ind_max = (image == image.max())
inner_pix = inner_pix * oversample
if (image[image_rho<inner_pix].max() == image.max()) and (image.max()>1000*image[~ind_max].max()):
r1 = inner_pix
r2 = r1 + 1.5
mean_inner = np.median(image[(image_rho >= r1) & (image_rho <= r2)])
# Make sure we're not adding too much flux
ind_inner = image_rho < inner_pix
if mean_inner * ind_inner.sum() > image[ind_inner].sum():
mean_inner = (image[ind_inner] - image.max()) / ind_inner.sum()
image[ind_inner] = mean_inner
# Crop image to minimum size plus some border
if shape_out is None:
_, crop_ind = crop_zero_rows_cols(disk_hdul[0].data, symmetric=True, return_indices=True)
ix1, _, iy1, _ = crop_ind
# Indices need to be multiples of oversample
ny, nx = disk_hdul[0].data.shape
ix1 = (ix1 // oversample) * oversample
iy1 = (iy1 // oversample) * oversample
ix2 = nx - ix1
iy2 = ny - iy1
# New shape (add some padding)
nx_new = (ix2 - ix1) + 4 * oversample
ny_new = (iy2 - iy1) + 4 * oversample
disk_hdul[0].data = crop_image(disk_hdul[0].data, (ny_new, nx_new), fill_val=0)
return disk_hdul
def get_cen_offsets(self, idl_offset=(0,0), PA_offset=0):
""" Determine pixel offsets relative to center of subarray
Given the 'idl' offset of some object relative to an observation's
siaf_ap reference position (e.g., mask center), determine the
relative position from the center of the subarray image.
Parameters
==========
idl_offset : tuple
Source location in arcsec from SIAF aperture position.
PA_offset : float
Rotation of scene by some position angle.
Positive values are counter-clockwise from +Y direction.
This should be -1 times telescope V3 PA.
"""
# Location relative to star
idl_offset = np.array(idl_offset)
delx_asec, dely_asec = idl_offset
# Add in PA offset
if PA_offset!=0:
delx_asec, dely_asec = xy_rot(delx_asec, dely_asec, PA_offset)
# Determine planet PSF shift in pixels compared to center of mask
# Convert delx and dely from 'idl' to 'sci' coords
xsci_ref, ysci_ref = (self.siaf_ap.XSciRef, self.siaf_ap.YSciRef)
xsci_pl, ysci_pl = self.siaf_ap.idl_to_sci(delx_asec, dely_asec)
delx_sci, dely_sci = (xsci_pl - xsci_ref, ysci_pl - ysci_ref)
# Add in bar offset
delx_asec = delx_sci * self.pixelscale + self.bar_offset
dely_asec = dely_sci * self.pixelscale
return (delx_asec, dely_asec)
def gen_coron_mask(self):
"""
Generate coronagraphic mask transmission images.
Output images are in 'sci' coordinates.
"""
pupil = self.pupil_mask
oversample = self.oversample
mask_dict = {}
if (not self.is_coron) and (not self.is_lyot):
mask_dict['DETSAMP'] = None
mask_dict['OVERSAMP'] = None
else:
detid = self.Detector.detid
x0, y0 = (self.det_info['x0'], self.det_info['y0'])
xpix, ypix = (self.det_info['xpix'], self.det_info['ypix'])
# im_det = build_mask_detid(detid, oversample=1, pupil=pupil)
im_over = build_mask_detid(detid, oversample=oversample,
pupil=pupil, filter=self.filter)
# Convert to det coords and crop
# im_det = sci_to_det(im_det, detid)
im_over = sci_to_det(im_over, detid)
im_det = frebin(im_over, scale=1/oversample, total=False)
im_det = im_det[y0:y0+ypix, x0:x0+xpix]
# Crop oversampled image
ix1, iy1 = np.array([x0, y0]) * oversample
ix2 = ix1 + xpix * oversample
iy2 = iy1 + ypix * oversample
im_over = im_over[iy1:iy2, ix1:ix2]
# Revert to sci coords
# TODO: Extent seems off by 1 pixel; Check if DETSAMP mask image is 0 or 1 indexed
mask_dict['DETSAMP'] = det_to_sci(im_det, detid)
mask_dict['OVERSAMP'] = det_to_sci(im_over, detid)
return mask_dict
def attenuate_with_coron_mask(self, image_oversampled, cmask):
""" Image attenuation from coronagraph mask features
Multiply image data by coronagraphic mask.
Excludes region already affected by observed occulting mask.
Involves mainly ND Squares and opaque COM holder.
Appropriately accounts for COM substrate wavelength-dep throughput.
WARNING: If self.ND_acq=True, then bandpass already includes
ND throughput, so be careful not to double count.
"""
# In case of imaging
if cmask is None:
return image_oversampled
w_um = self.bandpass.avgwave().to_value('um')
com_th = nircam_com_th(wave_out=w_um)
pixscale_over = self.pixelscale / self.oversample
# Exclude actual coronagraphic mask since this will be
# taken into account during PSF convolution. Not true for
# all other elements within FOV, ND squares, and mask holder.
if 'FULL' in self.det_info['wind_mode']:
cmask_temp = cmask.copy()
# center = cdict['cen_sci']
cdict = coron_ap_locs(self.module, self.channel, self.image_mask, pupil=self.pupil_mask)
center = np.array(cdict['cen_sci']) * self.oversample
# center = np.array(self.siaf_ap.reference_point('sci'))*self.oversample
r, th = dist_image(cmask_temp, pixscale=pixscale_over, center=center, return_theta=True)
x_asec, y_asec = rtheta_to_xy(r, th)
ind = (np.abs(y_asec)<4.5) & (cmask_temp>0)
cmask_temp[ind] = com_th
elif self.ND_acq:
# No modifications if ND_acq
cmask_temp = cmask
else:
cmask_temp = cmask.copy()
r, th = dist_image(cmask_temp, pixscale=pixscale_over, return_theta=True)
x_asec, y_asec = rtheta_to_xy(r, th)
# ind = (np.abs(x_asec)<10.05) & (np.abs(y_asec)<4.5)
ind = np.abs(y_asec)<4.5
cmask_temp[ind] = com_th
# COM throughput taken into account in bandpass throughput curve
# so divide out
cmask_temp = cmask_temp / com_th
return image_oversampled * cmask_temp