# Import libraries
from operator import add
import numpy as np
import time
import os, six
from pathlib import Path
import multiprocessing as mp
import traceback
from astropy.io import fits
from astropy.table import Table
import astropy.units as u
from copy import deepcopy
# Bandpasses, PSFs, and OPDs
from .bandpasses import miri_filter, nircam_filter
from .psfs import nproc_use, gen_image_from_coeff
from .psfs import make_coeff_resid_grid, field_coeff_func
from .opds import OPDFile_to_HDUList
from .spectra import stellar_spectrum
# Coordinates and image manipulation
from .coords import NIRCam_V2V3_limits, xy_rot, xy_to_rtheta, rtheta_to_xy
from .image_manip import frebin, pad_or_cut_to_size, rotate_offset
from .image_manip import fourier_imshift, fshift
# Polynomial fitting routines
from .maths import jl_poly, jl_poly_fit
import scipy
from scipy.interpolate import interp1d
# Logging info
from . import conf
from .logging_utils import setup_logging
import logging
_log = logging.getLogger('webbpsf_ext')
from .version import __version__
# WebbPSF and Poppy stuff
from .utils import check_fitsgz, webbpsf, poppy, S
from webbpsf import MIRI as webbpsf_MIRI
from webbpsf import NIRCam as webbpsf_NIRCam
from webbpsf.opds import OTE_Linear_Model_WSS
# Program bar
from tqdm.auto import trange, tqdm
# NIRCam Subclass
[docs]class NIRCam_ext(webbpsf_NIRCam):
""" NIRCam instrument PSF coefficients
Subclass of WebbPSF's NIRCam class for generating polynomial coefficients
to cache and quickly generate PSFs for arbitrary spectral types as well
as WFE variations due to field-dependent OPDs and telescope thermal drifts.
Parameters
==========
filter : str
Name of input filter.
pupil_mask : str, None
Pupil elements such as grisms or lyot stops (default: None).
image_mask : str, None
Specify which coronagraphic occulter (default: None).
fov_pix : int
Size of the PSF FoV in pixels (real SW or LW pixels).
The defaults depend on the type of observation.
Odd number place the PSF on the center of the pixel,
whereas an even number centers it on the "crosshairs."
oversample : int
Factor to oversample during WebbPSF calculations.
Default 2 for coronagraphy and 4 otherwise.
"""
[docs] def __init__(self, filter=None, pupil_mask=None, image_mask=None,
fov_pix=None, oversample=None, **kwargs):
webbpsf_NIRCam.__init__(self)
# Initialize script
_init_inst(self, filter=filter, pupil_mask=pupil_mask, image_mask=image_mask,
fov_pix=fov_pix, oversample=oversample, **kwargs)
# No jitter for coronagraphy by default
# Otherwise, assume 5 mas / axis
self.options['jitter'] = None if self.is_coron else 'gaussian'
self.options['jitter_sigma'] = 0.005
# By default, WebbPSF has wavelength limits depending on the channel
# which can interfere with coefficient calculations, so set these to
# extreme low/high values across the board.
self.SHORT_WAVELENGTH_MIN = self.LONG_WAVELENGTH_MIN = 1e-7
self.SHORT_WAVELENGTH_MAX = self.LONG_WAVELENGTH_MAX = 10e-6
# Detector name to SCA ID
self._det2sca = {
'A1':481, 'A2':482, 'A3':483, 'A4':484, 'A5':485,
'B1':486, 'B2':487, 'B3':488, 'B4':489, 'B5':490,
}
# Option to use 1st or 2nd order for grism bandpasses
self._grism_order = 1
# Specify ice and nvr scalings
self._ice_scale = kwargs['ice_scale'] if 'ice_scale' in kwargs.keys() else None
self._nvr_scale = kwargs['nvr_scale'] if 'nvr_scale' in kwargs.keys() else None
self._ote_scale = kwargs['ote_scale'] if 'ote_scale' in kwargs.keys() else None
self._nc_scale = kwargs['nc_scale'] if 'nc_scale' in kwargs.keys() else None
# Option to calculate ND acquisition for coronagraphic obs
self._ND_acq = False
# Option to specify that coronagraphic substrate materials is present.
self._coron_substrate = None
@property
def save_dir(self):
"""Coefficient save directory"""
return _gen_save_dir(self)
@save_dir.setter
def save_dir(self, value):
self._save_dir = value
def _erase_save_dir(self):
"""Erase all instrument coefficients"""
_clear_coeffs_dir(self)
@property
def save_name(self):
"""Coefficient file name"""
if self._save_name is None:
return self.gen_save_name()
else:
return self._save_name
@save_name.setter
def save_name(self, value):
self._save_name = value
@property
def is_lyot(self):
"""
Is a Lyot mask in the pupil wheel?
"""
pupil = self.pupil_mask
return (pupil is not None) and ('LYOT' in pupil)
@property
def is_coron(self):
"""
Observation with coronagraphic mask (incl Lyot stop)?
"""
mask = self.image_mask
return self.is_lyot and ((mask is not None) and ('MASK' in mask))
@property
def is_grism(self):
pupil = self.pupil_mask
return (pupil is not None) and ('GRISM' in pupil)
@property
def is_dark(self):
pupil = self.pupil_mask
return (pupil is not None) and ('FLAT' in pupil)
@property
def ND_acq(self):
"""Use Coronagraphic ND acquisition square?"""
return self._ND_acq
@ND_acq.setter
def ND_acq(self, value):
_check_list(value, [True, False], 'ND_acq')
self._ND_acq = value
@property
def coron_substrate(self):
"""Include coronagraphic substrate material?"""
# True by default if Lyot stop is in place.
# User should override this if intention is to get
# PSF outside of substrate region.
if ((self._coron_substrate is None) and self.is_lyot):
val = True
else:
val = self._coron_substrate
return val
@coron_substrate.setter
def coron_substrate(self, value):
_check_list(value, [True, False], 'coron_substrate')
self._coron_substrate = value
@property
def fov_pix(self):
return self._fov_pix
@fov_pix.setter
def fov_pix(self, value):
self._fov_pix = value
@property
def oversample(self):
if self._oversample is None:
# Detector oversampling of 2 if coronagraphy
# Calculation will still occur w/ FFT oversampling of 4
oversample = 2 if self.is_lyot else 4
else:
oversample = self._oversample
return oversample
@oversample.setter
def oversample(self, value):
self._oversample = value
@property
def wave_fit(self):
"""Wavelength range to fit"""
if self.quick:
w1 = self.bandpass.wave.min() / 1e4
w2 = self.bandpass.wave.max() / 1e4
else:
w1, w2 = (2.4,5.2) if self.channel=='long' else (0.5,2.5)
return w1, w2
@property
def npsf(self):
"""Number of wavelengths/PSFs to fit"""
w1, w2 = self.wave_fit
npsf = self._npsf
# Default to number of PSF simulations per um
if npsf is None:
dn = 10 if self.channel=='long' else 20
npsf = int(np.ceil(dn * (w2-w1)))
# Want at least 5 monochromatic PSFs
npsf = 5 if npsf<5 else int(npsf)
# Number of points must be greater than degree of fit
npsf = self.ndeg+1 if npsf<=self.ndeg else int(npsf)
return npsf
@npsf.setter
def npsf(self, value):
"""Set number of wavelengths/PSFs to fit"""
self._npsf = value
@property
def ndeg(self):
ndeg = self._ndeg
if ndeg is None:
if self.quick:
if self.filter[-2:]=='W2':
ndeg = 9
elif self.filter[-1]=='W':
ndeg = 8
elif self.filter[-1]=='M':
ndeg = 6
elif self.filter[-1]=='N':
ndeg = 4
else:
raise ValueError(f'{self.filter} not recognized as narrow, medium, wide, or double wide.')
else:
ndeg = 9
return ndeg
@ndeg.setter
def ndeg(self, value):
self._ndeg = value
@property
def quick(self):
"""Perform quicker coeff calculation over limited bandwidth?"""
# If quick is not explicitly set by user, then default to True:
if self._quick is not None:
quick = self._quick
else:
quick = True
return quick
@quick.setter
def quick(self, value):
"""Perform quicker coeff calculation over limited bandwidth?"""
_check_list(value, [True, False], 'quick')
self._quick = value
@webbpsf_NIRCam.pupil_mask.setter
def pupil_mask(self, name):
if name != self._pupil_mask:
# only apply updates if the value is in fact new
if name=='GRISM0':
name = 'GRISMR'
elif name=='GRISM90':
name = 'GRISMC'
super(NIRCam_ext, self.__class__).pupil_mask.__set__(self, name)
@property
def siaf_ap(self):
"""SIAF Aperture object"""
if self._siaf_ap is None:
return self.siaf[self.aperturename]
else:
return self._siaf_ap
@siaf_ap.setter
def siaf_ap(self, value):
self._siaf_ap = value
@property
def scaid(self):
"""SCA ID (481, 482, ... 489, 490)"""
detid = self.detector[-2:]
return self._det2sca.get(detid, 'unknown')
@scaid.setter
def scaid(self, value):
scaid_values = np.array(list(self._det2sca.values()))
det_values = np.array(list(self._det2sca.keys()))
if value in scaid_values:
ind = np.where(scaid_values==value)[0][0]
self.detector = 'NRC'+det_values[ind]
else:
_check_list(value, scaid_values, var_name='scaid')
@webbpsf_NIRCam.detector_position.setter
def detector_position(self, position):
# Remove limits for detector position
# Values outside of [0,2047] will get transformed to the correct V2/V3 location
try:
x, y = map(float, position)
except ValueError:
raise ValueError("Detector pixel coordinates must be a pair of numbers, not {}".format(position))
self._detector_position = (x,y)
def _get_fits_header(self, result, options):
""" populate FITS Header keywords """
super(NIRCam_ext, self)._get_fits_header(result, options)
# Keep detector X and Y positions as floats
dpos = np.asarray(self.detector_position, dtype=float)
result[0].header['DET_X'] = (dpos[0], "Detector X pixel position of array center")
result[0].header['DET_Y'] = (dpos[1], "Detector Y pixel position of array center")
@property
def fastaxis(self):
"""Fast readout direction in sci coords"""
# https://jwst-pipeline.readthedocs.io/en/latest/jwst/references_general/references_general.html#orientation-of-detector-image
# 481, 3, 5, 7, 9 have fastaxis equal -1
# Others have fastaxis equal +1
fastaxis = -1 if np.mod(self.scaid,2)==1 else +1
return fastaxis
@property
def slowaxis(self):
"""Slow readout direction in sci coords"""
# https://jwst-pipeline.readthedocs.io/en/latest/jwst/references_general/references_general.html#orientation-of-detector-image
# 481, 3, 5, 7, 9 have slowaxis equal +2
# Others have slowaxis equal -2
slowaxis = +2 if np.mod(self.scaid,2)==1 else -2
return slowaxis
@property
def bandpass(self):
""" Return bandpass throughput """
try:
kwargs = {
'ice_scale': self._ice_scale,
'nvr_scale': self._nvr_scale,
'ote_scale': self._ote_scale,
'nc_scale' : self._nc_scale,
}
bp = nircam_filter(self.filter, pupil=self.pupil_mask, mask=self.image_mask,
module=self.module, grism_order=self._grism_order,
ND_acq=self.ND_acq, coron_substrate=self.coron_substrate,
**kwargs)
except AttributeError:
bp = nircam_filter(self.filter, pupil=self.pupil_mask, mask=self.image_mask,
module=self.module)
return bp
[docs] def get_bar_offset(self, narrow=False):
"""
Obtain the value of the bar offset that would be passed through to
PSF calculations for bar/wedge coronagraphic masks.
"""
from webbpsf.optics import NIRCam_BandLimitedCoron
if (self.is_coron) and (self.image_mask[-1:]=='B'):
# Determine bar offset for Wedge masks either based on filter
# or explicit specification
bar_offset = self.options.get('bar_offset', None)
if bar_offset is None:
auto_offset = 'narrow' if narrow else self.filter
else:
try:
_ = float(bar_offset)
auto_offset = None
except ValueError:
# If the "bar_offset" isn't a float, pass it to auto_offset instead
auto_offset = bar_offset
bar_offset = None
mask = NIRCam_BandLimitedCoron(name=self.image_mask, module=self.module, kind='nircamwedge',
bar_offset=bar_offset, auto_offset=auto_offset)
return mask.bar_offset
else:
return None
[docs] def gen_mask_image(self, npix=None, pixelscale=None, bar_offset=None, nd_squares=True):
"""
Return an image representation of the focal plane mask attenuation.
Output is in 'sci' coords orientation. If no image mask
is present, then returns an array of all 1s. Mask is
centered in image, while actual subarray readout has a
slight offset.
Parameters
==========
npix : int
Number of pixels in output image. If not set, then
is automatically determined based on mask FoV and
`pixelscale`
pixelscale : float
Size of output pixels in units of arcsec. If not specified,
then selects oversample pixel scale.
"""
from webbpsf.optics import NIRCam_BandLimitedCoron
shifts = {'shift_x': self.options.get('coron_shift_x', None),
'shift_y': self.options.get('coron_shift_y', None)}
if pixelscale is None:
pixelscale = self.pixelscale / self.oversample
if npix is None:
osamp = self.pixelscale / pixelscale
npix = 320 if 'long' in self.channel else 640
npix = int(npix * osamp + 0.5)
if self.is_coron:
if self.image_mask[-1:]=='B':
bar_offset = self.get_bar_offset() if bar_offset is None else bar_offset
else:
bar_offset = None
mask = NIRCam_BandLimitedCoron(name=self.image_mask, module=self.module, nd_squares=nd_squares,
bar_offset=bar_offset, auto_offset=None, **shifts)
# Create wavefront to pass through mask and obtain transmission image
wavelength = self.bandpass.avgwave() / 1e10
wave = poppy.Wavefront(wavelength=wavelength, npix=npix, pixelscale=pixelscale)
im = mask.get_transmission(wave)**2
else:
im = np.ones([npix,npix])
return im
[docs] def get_opd_info(self, opd=None, pupil=None, HDUL_to_OTELM=True):
"""
Parse out OPD information for a given OPD, which can be a file name, tuple (file,slice),
HDUList, or OTE Linear Model. Returns dictionary of some relevant information for
logging purposes. The dictionary returns the OPD as an OTE LM by default.
This outputs an OTE Linear Model. In order to update instrument class:
>>> opd_dict = inst.get_opd_info()
>>> opd_new = opd_dict['pupilopd']
>>> inst.pupilopd = opd_new
>>> inst.pupil = opd_new
"""
return _get_opd_info(self, opd=opd, pupil=pupil, HDUL_to_OTELM=HDUL_to_OTELM)
[docs] def drift_opd(self, wfe_drift, opd=None):
"""
A quick method to drift the pupil OPD. This function applies some WFE drift to input
OPD file by breaking up the wfe_drift attribute into thermal, frill, and IEC components.
If we want more realistic time evolution, then we should use the procedure in
dev_utils/WebbPSF_OTE_LM.ipynb to create a time series of OPD maps, which can then be
passed directly to create unique PSFs.
This outputs an OTE Linear Model. In order to update instrument class:
>>> opd_dict = inst.drift_opd()
>>> inst.pupilopd = opd_dict['opd']
>>> inst.pupil = opd_dict['opd']
"""
return _drift_opd(self, wfe_drift, opd=opd)
[docs] def gen_save_name(self, wfe_drift=0):
"""
Generate save name for polynomial coefficient output file.
"""
return _gen_save_name(self, wfe_drift=wfe_drift)
[docs] def gen_psf_coeff(self, bar_offset=0, **kwargs):
"""Generate PSF coefficients
Creates a set of coefficients that will generate simulated PSFs for any
arbitrary wavelength. This function first simulates a number of evenly-
spaced PSFs throughout the specified bandpass (or the full channel).
An nth-degree polynomial is then fit to each oversampled pixel using
a linear-least squares fitting routine. The final set of coefficients
for each pixel is returned as an image cube. The returned set of
coefficient are then used to produce PSF via `calc_psf_from_coeff`.
Useful for quickly generated imaging and dispersed PSFs for multiple
spectral types.
Parameters
----------
bar_offset : float
For wedge masks, option to set the PSF position across the bar.
In this framework, we generally set the default to 0, then use
the `gen_wfemask_coeff` function to determine how the PSF changes
along the wedge axis as well as perpendicular to the wedge. This
allows for more arbitrary PSFs within the mask, including small
grid dithers as well as variable PSFs for extended objects.
Default: 0.
Keyword Args
------------
wfe_drift : float
Wavefront error drift amplitude in nm.
force : bool
Forces a recalculation of PSF even if saved PSF exists. (default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
nproc : bool or None
Manual setting of number of processor cores to break up PSF calculation.
If set to None, this is determined based on the requested PSF size,
number of available memory, and hardware processor cores. The automatic
calculation endeavors to leave a number of resources available to the
user so as to not crash the user's machine.
return_results : bool
By default, results are saved as object the attributes `psf_coeff` and
`psf_coeff_header`. If return_results=True, results are instead returned
as function outputs and will not be saved to the attributes. This is mostly
used for successive coeff simulations to determine varying WFE drift or
focal plane dependencies.
return_extras : bool
Additionally returns a dictionary of monochromatic PSFs images and their
corresponding wavelengths for debugging purposes. Can be used with or without
`return_results`. If `return_results=False`, then only this dictionary is
returned, otherwise if `return_results=False` then returns everything as a
3-element tuple (psf_coeff, psf_coeff_header, extras_dict).
"""
# Set to input bar offset value. No effect if not a wedge mask.
bar_offset_orig = self.options.get('bar_offset', None)
self.options['bar_offset'] = bar_offset
res = _gen_psf_coeff(self, **kwargs)
self.options['bar_offset'] = bar_offset_orig
return res
[docs] def gen_wfedrift_coeff(self, force=False, save=True, **kwargs):
""" Fit WFE drift coefficients
This function finds a relationship between PSF coefficients in the
presence of WFE drift. For a series of WFE drift values, we generate
corresponding PSF coefficients and fit a polynomial relationship to
the residual values. This allows us to quickly modify a nominal set of
PSF image coefficients to generate a new PSF where the WFE has drifted
by some amplitude.
It's Legendre's all the way down...
Parameters
----------
force : bool
Forces a recalculation of coefficients even if saved file exists.
(default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
Keyword Args
------------
wfe_list : array-like
A list of wavefront error drift values (nm) to calculate and fit.
Default is [0,1,2,5,10,20,40], which covers the most-likely
scenarios (1-5nm) while also covering a range of extreme drift
values (10-40nm).
return_results : bool
By default, results are saved in `self._psf_coeff_mod` dictionary.
If return_results=True, results are instead returned as function outputs
and will not be saved to the dictionary attributes.
return_raw : bool
Normally, we return the relation between PSF coefficients as a function
of position. Instead this returns (as function outputs) the raw values
prior to fitting. Final results will not be saved to the dictionary attributes.
"""
# Set to input bar offset value. No effect if not a wedge mask.
bar_offset_orig = self.options.get('bar_offset', None)
try:
self.options['bar_offset'] = self.psf_coeff_header.get('BAROFF', None)
except AttributeError:
# Throws error if psf_coeff_header doesn't exist
_log.error("psf_coeff_header does not appear to exist. Run gen_psf_coeff().")
res = 0
else:
res = _gen_wfedrift_coeff(self, force=force, save=save, **kwargs)
finally:
self.options['bar_offset'] = bar_offset_orig
return res
[docs] def gen_wfemask_coeff(self, large_grid=False, force=False, save=True, **kwargs):
""" Fit WFE changes in mask position
For coronagraphic masks, slight changes in the PSF location
relative to the image plane mask can substantially alter the
PSF speckle pattern. This function generates a number of PSF
coefficients at a variety of positions, then fits polynomials
to the residuals to track how the PSF changes across the mask's
field of view. Special care is taken near the 10-20mas region
in order to provide accurate sampling of the SGD offsets.
Parameters
----------
large_grid : bool
Use a large number (high-density) of grid points to create coefficients.
If True, then a higher fidelity PSF variations across the FoV, but could
take hours to generate on the first pass. Setting to False allows for
quicker coefficient creation with a smaller memory footprint, useful for
testing and debugging.
force : bool
Forces a recalculation of coefficients even if saved file exists.
(default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
Keyword Args
------------
return_results : bool
By default, results are saved in `self._psf_coeff_mod` dictionary.
If return_results=True, results are instead returned as function outputs
and will not be saved to the dictionary attributes.
return_raw : bool
Normally, we return the relation between PSF coefficients as a function
of position. Instead this returns (as function outputs) the raw values
prior to fitting. Final results will not be saved to the dictionary attributes.
"""
return _gen_wfemask_coeff(self, large_grid=large_grid, force=force, save=save, **kwargs)
[docs] def gen_wfefield_coeff(self, force=False, save=True, **kwargs):
""" Fit WFE field-dependent coefficients
Find a relationship between field position and PSF coefficients for
non-coronagraphic observations and when `include_si_wfe` is enabled.
Parameters
----------
force : bool
Forces a recalculation of coefficients even if saved file exists.
(default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
Keyword Args
------------
return_results : bool
By default, results are saved in `self._psf_coeff_mod` dictionary.
If return_results=True, results are instead returned as function outputs
and will not be saved to the dictionary attributes.
return_raw : bool
Normally, we return the relation between PSF coefficients as a function
of position. Instead this returns (as function outputs) the raw values
prior to fitting. Final results will not be saved to the dictionary attributes.
"""
return _gen_wfefield_coeff(self, force=force, save=save, **kwargs)
[docs] def calc_psf_from_coeff(self, sp=None, return_oversample=True, wfe_drift=None,
coord_vals=None, coord_frame='tel', coron_rescale=False, return_hdul=True,
**kwargs):
""" Create PSF image from polynomial coefficients
Create a PSF image from instrument settings. The image is noiseless and
doesn't take into account any non-linearity or saturation effects, but is
convolved with the instrument throughput. Pixel values are in counts/sec.
The result is effectively an idealized slope image (no background).
Returns a single image or list of images if sp is a list of spectra.
By default, it returns only the oversampled PSF, but setting
return_oversample=False will instead return detector-sampled images.
Parameters
----------
sp : :mod:`pysynphot.spectrum`
If not specified, the default is flat in phot lam
(equal number of photons per spectral bin).
The default is normalized to produce 1 count/sec within that bandpass,
assuming the telescope collecting area and instrument bandpass.
Coronagraphic PSFs will further decrease this due to the smaller pupil
size and coronagraphic spot.
return_oversample : bool
Returns the oversampled version of the PSF instead of detector-sampled PSF.
Default: True.
wfe_drift : float or None
Wavefront error drift amplitude in nm.
coord_vals : tuple or None
Coordinates (in arcsec or pixels) to calculate field-dependent PSF.
If multiple values, then this should be an array ([xvals], [yvals]).
coord_frame : str
Type of input coordinates.
* 'tel': arcsecs V2,V3
* 'sci': pixels, in DMS axes orientation; aperture-dependent
* 'det': pixels, in raw detector read out axes orientation
* 'idl': arcsecs relative to aperture reference location.
return_hdul : bool
Return PSFs in an HDUList rather than set of arrays (default: True).
coron_rescale : bool
Rescale off-axis coronagraphic PSF to better match analytic prediction
when source overlaps coronagraphic occulting mask.
Primarily used for planetary companion PSFs.
"""
res = _calc_psf_from_coeff(self, sp=sp, return_oversample=return_oversample,
coord_vals=coord_vals, coord_frame=coord_frame,
wfe_drift=wfe_drift, return_hdul=return_hdul, **kwargs)
# Ensure correct scaling for off-axis PSFs
if self.is_coron and coron_rescale and (coord_vals is not None):
siaf_ap = kwargs.get('siaf_ap', None)
res = _nrc_coron_rescale(self, res, coord_vals, coord_frame, siaf_ap=siaf_ap, sp=sp)
return res
[docs] def calc_psf(self, add_distortion=None, fov_pixels=None, oversample=None,
wfe_drift=None, coord_vals=None, coord_frame='tel', **kwargs):
""" Compute a PSF
Slight modification of inherent WebbPSF `calc_psf` function. If add_distortion, fov_pixels,
and oversample are not specified, then we automatically use the associated attributes.
Also, add ability to directly specify wfe_drift and coordinate offset values in the same
fashion as `calc_psf_from_coeff`.
Notes
-----
Additional PSF computation options (pupil shifts, source positions, jitter, ...)
may be set by configuring the `.options` dictionary attribute of this class.
Parameters
----------
sp : :mod:`pysynphot.spectrum`
Source input spectrum. If not specified, the default is flat in phot lam.
(equal number of photons per spectral bin).
source : synphot.spectrum.SourceSpectrum or dict
TODO: synphot not yet implemented in webbpsf_ext!!
nlambda : int
How many wavelengths to model for broadband?
The default depends on how wide the filter is: (5,3,1) for types (W,M,N) respectively
monochromatic : float, optional
Setting this to a wavelength value (in meters) will compute a monochromatic PSF at that
wavelength, overriding filter and nlambda settings.
fov_arcsec : float
field of view in arcsec. Default=5
fov_pixels : int
field of view in pixels. This is an alternative to fov_arcsec.
outfile : string
Filename to write. If None, then result is returned as an HDUList
oversample, detector_oversample, fft_oversample : int
How much to oversample. Default=4. By default the same factor is used for final output
pixels and intermediate optical planes, but you may optionally use different factors
if so desired.
overwrite : bool
overwrite output FITS file if it already exists?
display : bool
Whether to display the PSF when done or not.
save_intermediates, return_intermediates : bool
Options for saving to disk or returning to the calling function the intermediate optical planes during
the propagation. This is useful if you want to e.g. examine the intensity in the Lyot plane for a
coronagraphic propagation.
normalize : string
Desired normalization for output PSFs. See doc string for OpticalSystem.calc_psf. Default is
to normalize the entrance pupil to have integrated total intensity = 1.
add_distortion : bool
If True, will add 2 new extensions to the PSF HDUlist object. The 2nd extension
will be a distorted version of the over-sampled PSF and the 3rd extension will
be a distorted version of the detector-sampled PSF.
crop_psf : bool
If True, when the PSF is rotated to match the detector's rotation in the focal
plane, the PSF will be cropped so the shape of the distorted PSF will match it's
undistorted counterpart. This will only be used for NIRCam, NIRISS, and FGS PSFs.
Keyword Args
------------
return_hdul : bool
Return PSFs in an HDUList rather than set of arrays (default: True).
return_oversample : bool
Returns the oversampled version of the PSF instead of detector-sampled PSF.
Only valid for `reaturn_hdul=False`, otherwise full HDUList returned. Default: True.
"""
calc_psf_func = super().calc_psf
res = _calc_psf_webbpsf(self, calc_psf_func, add_distortion=add_distortion,
fov_pixels=fov_pixels, oversample=oversample, wfe_drift=wfe_drift,
coord_vals=coord_vals, coord_frame=coord_frame, **kwargs)
return res
[docs] def calc_psfs_grid(self, sp=None, wfe_drift=0, osamp=1, npsf_per_full_fov=15,
xsci_vals=None, ysci_vals=None, return_coords=None,
use_coeff=True, **kwargs):
""" PSF grid across an instrumnet FoV
Create a grid of PSFs across instrument aperture FoV. By default,
imaging observations will be for full detector FoV with regularly
spaced grid. Coronagraphic observations will cover nominal
coronagraphic mask region (usually 10s of arcsec) and will have
logarithmically spaced values where appropriate.
Keyword Args
============
sp : :mod:`pysynphot.spectrum`
If not specified, the default is flat in phot lam (equal number of photons
per wavelength bin). The default is normalized to produce 1 count/sec within
that bandpass, assuming the telescope collecting area and instrument bandpass.
Coronagraphic PSFs will further decrease this due to the smaller pupil
size and suppression of coronagraphic mask.
If set, then the resulting PSF image will be scaled to generate the total
observed number of photons from the spectrum (ie., not scaled by unit response).
wfe_drift : float
Desired WFE drift value relative to default OPD.
osamp : int
Sampling of output PSF relative to detector sampling.
npsf_per_full_fov : int
Number of PSFs across one dimension of the instrument's field of
view. If a coronagraphic observation, then this is for the nominal
coronagrahic field of view (20"x20").
xsci_vals: None or ndarray
Option to pass a custom grid values along x-axis in 'sci' coords.
ysci_vals: None or ndarray
Option to pass a custom grid values along y-axis in 'sci' coords.
return_coords : None or str
Option to also return coordinate values in desired frame
('det', 'sci', 'tel', 'idl'). Output is then xvals, yvals, hdul_psfs.
use_coeff : bool
If True, uses `calc_psf_from_coeff`, other WebbPSF's built-in `calc_psf`.
coron_rescale : bool
Rescale off-axis coronagraphic PSF to better match analytic prediction
when source overlaps coronagraphic occulting mask. Only valid for use_coeff=True.
"""
res = _calc_psfs_grid(self, sp=sp, wfe_drift=wfe_drift, osamp=osamp, npsf_per_full_fov=npsf_per_full_fov,
xsci_vals=xsci_vals, ysci_vals=ysci_vals, return_coords=return_coords,
use_coeff=use_coeff, **kwargs)
return res
[docs] def calc_psfs_sgd(self, xoff_asec, yoff_asec, use_coeff=True, **kwargs):
""" Calculate small grid dither PSFs
Convenience function to calculation a series of SGD PSFs. This is
essentially a wrapper around the `calc_psf_from_coeff` and `calc_psf`
functions. Only valid for coronagraphic observations.
Parameters
==========
xoff_asec : float or array-like
Offsets in x-direction (in 'idl' coordinates).
yoff_asec : float or array-like
Offsets in y-direction (in 'idl' coordinates).
use_coeff : bool
If True, uses `calc_psf_from_coeff`, other WebbPSF's built-in `calc_psf`.
"""
res = _calc_psfs_sgd(self, xoff_asec, yoff_asec, use_coeff=use_coeff, **kwargs)
return res
# MIRI Subclass
[docs]class MIRI_ext(webbpsf_MIRI):
""" MIRI instrument PSF coefficients
Subclass of WebbPSF's MIRI class for generating polynomial coefficients
to cache and quickly generate PSFs for arbitrary spectral types as well
as WFE variations due to field-dependent OPDs and telescope thermal drifts.
Parameters
==========
filter : str
Name of input filter.
pupil_mask : str, None
Pupil elements such as grisms or lyot stops (default: None).
image_mask : str, None
Specify which coronagraphic occulter (default: None).
fov_pix : int
Size of the PSF FoV in pixels (real SW or LW pixels).
The defaults depend on the type of observation.
Odd number place the PSF on the center of the pixel,
whereas an even number centers it on the "crosshairs."
oversample : int
Factor to oversample during WebbPSF calculations.
Default 2 for coronagraphy and 4 otherwise.
"""
[docs] def __init__(self, filter=None, pupil_mask=None, image_mask=None,
fov_pix=None, oversample=None, **kwargs):
webbpsf_MIRI.__init__(self)
_init_inst(self, filter=filter, pupil_mask=pupil_mask, image_mask=image_mask,
fov_pix=fov_pix, oversample=oversample, **kwargs)
# No jitter for coronagraphy by default
# Otherwise assume 5 mas / axis
self.options['jitter'] = None if self.is_coron else 'gaussian'
self.options['jitter_sigma'] = 0.005
@property
def save_dir(self):
"""Coefficient save directory"""
return _gen_save_dir(self)
@save_dir.setter
def save_dir(self, value):
self._save_dir = value
def _erase_save_dir(self):
"""Erase all instrument coefficients"""
_clear_coeffs_dir(self)
@property
def save_name(self):
"""Coefficient file name"""
if self._save_name is None:
return self.gen_save_name()
else:
return self._save_name
@save_name.setter
def save_name(self, value):
self._save_name = value
@property
def is_coron(self):
"""
Coronagraphic observations based on pupil mask settings
"""
pupil = self.pupil_mask
return (pupil is not None) and (('LYOT' in pupil) or ('FQPM' in pupil))
@property
def is_slitspec(self):
"""
LRS observations based on pupil mask settings
"""
pupil = self.pupil_mask
return (pupil is not None) and ('LRS' in pupil)
@property
def fov_pix(self):
return self._fov_pix
@fov_pix.setter
def fov_pix(self, value):
self._fov_pix = value
@property
def oversample(self):
if self._oversample is None:
oversample = 2 if self.is_coron else 4
else:
oversample = self._oversample
return oversample
@oversample.setter
def oversample(self, value):
self._oversample = value
@property
def wave_fit(self):
"""Wavelength range to fit"""
if self.quick:
w1 = self.bandpass.wave.min() / 1e4
w2 = self.bandpass.wave.max() / 1e4
else:
w1, w2 = (5,30)
return (w1, w2)
@property
def npsf(self):
"""Number of wavelengths/PSFs to fit"""
w1, w2 = self.wave_fit
npsf = self._npsf
# Default to 10 PSF simulations per um
if npsf is None:
dn = 10
npsf = int(np.ceil(dn * (w2-w1)))
# Want at least 5 monochromatic PSFs
npsf = 5 if npsf<5 else int(npsf)
# Number of points must be greater than degree of fit
npsf = self.ndeg+1 if npsf<=self.ndeg else int(npsf)
return npsf
@npsf.setter
def npsf(self, value):
"""Set number of wavelengths/PSFs to fit"""
self._npsf = value
@property
def ndeg(self):
"""Degree of polynomial fit"""
ndeg = self._ndeg
if ndeg is None:
# TODO: Quantify these better
if self.use_legendre:
ndeg = 4 if self.quick else 7
else:
ndeg = 4 if self.quick else 7
return ndeg
@ndeg.setter
def ndeg(self, value):
"""Degree of polynomial fit"""
self._ndeg = value
@property
def quick(self):
"""Perform quicker coeff calculation over limited bandwidth?"""
if self._quick is not None:
quick = self._quick
else:
quick = True
return quick
@quick.setter
def quick(self, value):
"""Perform quicker coeff calculation over limited bandwidth?"""
_check_list(value, [True, False], 'quick')
self._quick = value
@property
def siaf_ap(self):
"""SIAF Aperture object"""
if self._siaf_ap is None:
return self.siaf[self.aperturename]
else:
return self._siaf_ap
@siaf_ap.setter
def siaf_ap(self, value):
self._siaf_ap = value
@webbpsf_MIRI.detector_position.setter
def detector_position(self, position):
try:
x, y = map(float, position)
except ValueError:
raise ValueError("Detector pixel coordinates must be a pair of numbers, not {}".format(position))
self._detector_position = (x,y)
def _get_fits_header(self, result, options):
""" populate FITS Header keywords """
super(MIRI_ext, self)._get_fits_header(result, options)
# Keep detector X and Y positions as floats
dpos = np.asarray(self.detector_position, dtype=float)
result[0].header['DET_X'] = (dpos[0], "Detector X pixel position of array center")
result[0].header['DET_Y'] = (dpos[1], "Detector Y pixel position of array center")
@property
def fastaxis(self):
"""Fast readout direction in sci coords"""
# https://jwst-pipeline.readthedocs.io/en/latest/jwst/references_general/references_general.html#orientation-of-detector-image
# MIRI always has fastaxis equal +1
return +1
@property
def slowaxis(self):
"""Slow readout direction in sci coords"""
# https://jwst-pipeline.readthedocs.io/en/latest/jwst/references_general/references_general.html#orientation-of-detector-image
# MIRI always has slowaxis equal +2
return +2
@property
def bandpass(self):
return miri_filter(self.filter)
[docs] def gen_mask_image(self, npix=None, pixelscale=None, detector_orientation=True):
"""
Return an image representation of the focal plane mask.
For 4QPM, we should the phase offsets (0 or 1), whereas
the Lyot and LRS slit masks return transmission.
Parameters
==========
npix : int
Number of pixels in output image. If not set, then
is automatically determined based on mask FoV and
`pixelscale`
pixelscale : float
Size of output pixels in units of arcsec. If not specified,
then selects nominal detector pixel scale.
detector_orientation : bool
Should the output image be rotated to be in detector coordinates?
If set to False, then output mask is rotated along V2/V3 axes.
"""
def make_fqpm_wrapper(name, wavelength):
opticslist = [poppy.IdealFQPM(wavelength=wavelength, name=self.image_mask, rotation=rot1, **offsets),
poppy.SquareFieldStop(size=24, rotation=rot2, **offsets)]
container = poppy.CompoundAnalyticOptic(name=name, opticslist=opticslist)
return container
rot1 = -1*self._rotation if detector_orientation else 0
rot2 = 0 if detector_orientation else self._rotation
offsets = {'shift_x': self.options.get('coron_shift_x', None),
'shift_y': self.options.get('coron_shift_y', None)}
if pixelscale is None:
pixelscale = self.pixelscale / self.oversample
if self.image_mask == 'FQPM1065':
full_pad = 2*np.max(np.abs(xy_rot(12, 12, rot2)))
npix = int(full_pad / pixelscale + 0.5) if npix is None else npix
wave = poppy.Wavefront(wavelength=10.65e-6, npix=npix, pixelscale=pixelscale)
mask = make_fqpm_wrapper("MIRI FQPM 1065", 10.65e-6)
im = np.real(mask.get_phasor(wave))
im /= im.max()
elif self.image_mask == 'FQPM1140':
full_pad = 2*np.max(np.abs(xy_rot(12, 12, rot2)))
npix = int(full_pad / pixelscale + 0.5) if npix is None else npix
wave = poppy.Wavefront(wavelength=11.4e-6, npix=npix, pixelscale=pixelscale)
mask = make_fqpm_wrapper("MIRI FQPM 1140", 11.40e-6)
im = np.real(mask.get_phasor(wave))
im /= im.max()
elif self.image_mask == 'FQPM1550':
full_pad = 2*np.max(np.abs(xy_rot(12, 12, rot2)))
npix = int(full_pad / pixelscale + 0.5) if npix is None else npix
wave = poppy.Wavefront(wavelength=15.5e-6, npix=npix, pixelscale=pixelscale)
mask = make_fqpm_wrapper("MIRI FQPM 1550", 15.50e-6)
im = np.real(mask.get_phasor(wave))
im /= im.max()
elif self.image_mask == 'LYOT2300':
full_pad = 2*np.max(np.abs(xy_rot(15, 15, rot2)))
npix = int(full_pad / pixelscale + 0.5) if npix is None else npix
wave = poppy.Wavefront(wavelength=23e-6, npix=npix, pixelscale=pixelscale)
opticslist = [poppy.CircularOcculter(radius=4.25 / 2, name=self.image_mask, rotation=rot1, **offsets),
poppy.BarOcculter(width=0.722, height=31, rotation=rot1, **offsets),
poppy.SquareFieldStop(size=30, rotation=rot2, **offsets)]
mask = poppy.CompoundAnalyticOptic(name="MIRI Lyot Occulter", opticslist=opticslist)
im = mask.get_transmission(wave)**2
elif self.image_mask == 'LRS slit':
full_pad = 2*np.max(np.abs(xy_rot(2.5, 2.5, rot2)))
npix = int(full_pad / pixelscale + 0.5) if npix is None else npix
wave = poppy.Wavefront(wavelength=23e-6, npix=npix, pixelscale=pixelscale)
mask = poppy.RectangularFieldStop(width=4.7, height=0.51, rotation=rot2,
name=self.image_mask, **offsets)
im = mask.get_transmission(wave)**2
else:
im = np.ones([npix,npix])
return im
[docs] def get_opd_info(self, opd=None, pupil=None, HDUL_to_OTELM=True):
"""
Parse out OPD information for a given OPD, which
can be a file name, tuple (file,slice), HDUList,
or OTE Linear Model. Returns dictionary of some
relevant information for logging purposes.
The dictionary has an OPD version as an OTE LM.
This outputs an OTE Linear Model.
In order to update instrument class:
>>> opd_dict = inst.get_opd_info()
>>> opd_new = opd_dict['pupilopd']
>>> inst.pupilopd = opd_new
>>> inst.pupil = opd_new
"""
return _get_opd_info(self, opd=opd, pupil=pupil, HDUL_to_OTELM=HDUL_to_OTELM)
[docs] def drift_opd(self, wfe_drift, opd=None):
"""
A quick method to drift the pupil OPD. This function applies
some WFE drift to input OPD file by breaking up the wfe_drift
attribute into thermal, frill, and IEC components. If we want
more realistic time evolution, then we should use the procedure
in dev_utils/WebbPSF_OTE_LM.ipynb to create a time series of OPD
maps, which can then be passed directly to create unique PSFs.
This outputs an OTE Linear Model. In order to update instrument class:
>>> opd_dict = inst.drift_opd()
>>> inst.pupilopd = opd_dict['opd']
>>> inst.pupil = opd_dict['opd']
"""
return _drift_opd(self, wfe_drift, opd=opd)
[docs] def gen_save_name(self, wfe_drift=0):
"""
Generate save name for polynomial coefficient output file.
"""
return _gen_save_name(self, wfe_drift=wfe_drift)
[docs] def gen_psf_coeff(self, **kwargs):
"""Generate PSF coefficients
Creates a set of coefficients that will generate simulated PSFs for any
arbitrary wavelength. This function first simulates a number of evenly-
spaced PSFs throughout the specified bandpass (or the full channel).
An nth-degree polynomial is then fit to each oversampled pixel using
a linear-least squares fitting routine. The final set of coefficients
for each pixel is returned as an image cube. The returned set of
coefficient are then used to produce PSF via `calc_psf_from_coeff`.
Useful for quickly generated imaging and dispersed PSFs for multiple
spectral types.
Keyword Args
------------
wfe_drift : float
Wavefront error drift amplitude in nm.
force : bool
Forces a recalculation of PSF even if saved PSF exists. (default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
nproc : bool or None
Manual setting of number of processor cores to break up PSF calculation.
If set to None, this is determined based on the requested PSF size,
number of available memory, and hardware processor cores. The automatic
calculation endeavors to leave a number of resources available to the
user so as to not crash the user's machine.
return_results : bool
By default, results are saved as object the attributes `psf_coeff` and
`psf_coeff_header`. If return_results=True, results are instead returned
as function outputs and will not be saved to the attributes. This is mostly
used for successive coeff simulations to determine varying WFE drift or
focal plane dependencies.
return_extras : bool
Additionally returns a dictionary of monochromatic PSFs images and their
corresponding wavelengths for debugging purposes. Can be used with or without
`return_results`. If `return_results=False`, then only this dictionary is
returned, otherwise if `return_results=False` then returns everything as a
3-element tuple (psf_coeff, psf_coeff_header, extras_dict).
"""
return _gen_psf_coeff(self, **kwargs)
[docs] def gen_wfedrift_coeff(self, force=False, save=True, **kwargs):
""" Fit WFE drift coefficients
This function finds a relationship between PSF coefficients in the
presence of WFE drift. For a series of WFE drift values, we generate
corresponding PSF coefficients and fit a polynomial relationship to
the residual values. This allows us to quickly modify a nominal set of
PSF image coefficients to generate a new PSF where the WFE has drifted
by some amplitude.
It's Legendre's all the way down...
Parameters
----------
force : bool
Forces a recalculation of coefficients even if saved file exists.
(default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
Keyword Args
------------
wfe_list : array-like
A list of wavefront error drift values (nm) to calculate and fit.
Default is [0,1,2,5,10,20,40], which covers the most-likely
scenarios (1-5nm) while also covering a range of extreme drift
values (10-40nm).
return_results : bool
By default, results are saved in `self._psf_coeff_mod` dictionary.
If return_results=True, results are instead returned as function outputs
and will not be saved to the dictionary attributes.
return_raw : bool
Normally, we return the relation between PSF coefficients as a function
of position. Instead this returns (as function outputs) the raw values
prior to fitting. Final results will not be saved to the dictionary attributes.
"""
return _gen_wfedrift_coeff(self, force=force, save=save, **kwargs)
[docs] def gen_wfefield_coeff(self, force=False, save=True, **kwargs):
""" Fit WFE field-dependent coefficients
Find a relationship between field position and PSF coefficients for
non-coronagraphic observations and when `include_si_wfe` is enabled.
Parameters
----------
force : bool
Forces a recalculation of coefficients even if saved file exists.
(default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
Keyword Args
------------
return_results : bool
By default, results are saved in `self._psf_coeff_mod` dictionary.
If return_results=True, results are instead returned as function outputs
and will not be saved to the dictionary attributes.
return_raw : bool
Normally, we return the relation between PSF coefficients as a function
of position. Instead this returns (as function outputs) the raw values
prior to fitting. Final results will not be saved to the dictionary attributes.
"""
return _gen_wfefield_coeff(self, force=force, save=save, **kwargs)
[docs] def gen_wfemask_coeff(self, large_grid=False, force=False, save=True, **kwargs):
""" Fit WFE changes in mask position
For coronagraphic masks, slight changes in the PSF location
relative to the image plane mask can substantially alter the
PSF speckle pattern. This function generates a number of PSF
coefficients at a variety of positions, then fits polynomials
to the residuals to track how the PSF changes across the mask's
field of view. Special care is taken near the 10-20mas region
in order to provide accurate sampling of the SGD offsets.
Parameters
----------
large_grid : bool
Use a large number (high-density) of grid points to create coefficients.
If True, then a higher fidelity PSF variations across the FoV, but could
take hours to genrate on the first pass. Setting to False allows for
quicker coefficient creation with a smaller memory footprint, useful for
testing and debugging.
force : bool
Forces a recalculation of coefficients even if saved file exists.
(default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
Keyword Args
------------
return_results : bool
By default, results are saved in `self._psf_coeff_mod` dictionary.
If return_results=True, results are instead returned as function outputs
and will not be saved to the dictionary attributes.
return_raw : bool
Normally, we return the relation between PSF coefficients as a function
of position. Instead this returns (as function outputs) the raw values
prior to fitting. Final results will not be saved to the dictionary attributes.
"""
return _gen_wfemask_coeff(self, large_grid=large_grid, force=force, save=save, **kwargs)
[docs] def calc_psf_from_coeff(self, sp=None, return_oversample=True, return_hdul=True,
wfe_drift=None, coord_vals=None, coord_frame='tel', **kwargs):
""" Create PSF image from coefficients
Create a PSF image from instrument settings. The image is noiseless and
doesn't take into account any non-linearity or saturation effects, but is
convolved with the instrument throughput. Pixel values are in counts/sec.
The result is effectively an idealized slope image (no background).
Returns a single image or list of images if sp is a list of spectra.
By default, it returns only the oversampled PSF, but setting
return_oversample=False will instead return detector-sampled images.
Parameters
----------
sp : :mod:`pysynphot.spectrum`
If not specified, the default is flat in phot lam
(equal number of photons per spectral bin).
The default is normalized to produce 1 count/sec within that bandpass,
assuming the telescope collecting area and instrument bandpass.
Coronagraphic PSFs will further decrease this due to the smaller pupil
size and coronagraphic mask.
return_oversample : bool
Returns the oversampled version of the PSF instead of detector-sampled PSF.
Default: True.
wfe_drift : float or None
Wavefront error drift amplitude in nm.
coord_vals : tuple or None
Coordinates (in arcsec or pixels) to calculate field-dependent PSF.
If multiple values, then this should be an array ([xvals], [yvals]).
coord_frame : str
Type of input coordinates.
* 'tel': arcsecs V2,V3
* 'sci': pixels, in conventional DMS axes orientation
* 'det': pixels, in raw detector read out axes orientation
* 'idl': arcsecs relative to aperture reference location.
return_hdul : bool
Return PSFs in an HDUList rather than set of arrays (default: True).
"""
return _calc_psf_from_coeff(self, sp=sp, return_oversample=return_oversample,
coord_vals=coord_vals, coord_frame=coord_frame,
wfe_drift=wfe_drift, return_hdul=return_hdul, **kwargs)
[docs] def calc_psf(self, add_distortion=None, fov_pixels=None, oversample=None,
wfe_drift=None, coord_vals=None, coord_frame='tel', **kwargs):
""" Compute a PSF
Slight modification of inherent WebbPSF `calc_psf` function. If add_distortion, fov_pixels,
and oversample are not specified, then we automatically use the associated attributes.
Notes
-----
More advanced PSF computation options (pupil shifts, source positions, jitter, ...)
may be set by configuring the `.options` dictionary attribute of this class.
Parameters
----------
sp : :mod:`pysynphot.spectrum`
Source input spectrum. If not specified, the default is flat in phot lam.
(equal number of photons per spectral bin).
source : synphot.spectrum.SourceSpectrum or dict
TODO: synphot not yet implemented in webbpsf_ext!!
nlambda : int
How many wavelengths to model for broadband?
The default depends on how wide the filter is: (5,3,1) for types (W,M,N) respectively
monochromatic : float, optional
Setting this to a wavelength value (in meters) will compute a monochromatic PSF at that
wavelength, overriding filter and nlambda settings.
fov_arcsec : float
field of view in arcsec. Default=5
fov_pixels : int
field of view in pixels. This is an alternative to fov_arcsec.
outfile : string
Filename to write. If None, then result is returned as an HDUList
oversample, detector_oversample, fft_oversample : int
How much to oversample. Default=4. By default the same factor is used for final output
pixels and intermediate optical planes, but you may optionally use different factors
if so desired.
overwrite : bool
overwrite output FITS file if it already exists?
display : bool
Whether to display the PSF when done or not.
save_intermediates, return_intermediates : bool
Options for saving to disk or returning to the calling function the intermediate optical planes during
the propagation. This is useful if you want to e.g. examine the intensity in the Lyot plane for a
coronagraphic propagation.
normalize : string
Desired normalization for output PSFs. See doc string for OpticalSystem.calc_psf. Default is
to normalize the entrance pupil to have integrated total intensity = 1.
add_distortion : bool
If True, will add 2 new extensions to the PSF HDUlist object. The 2nd extension
will be a distorted version of the over-sampled PSF and the 3rd extension will
be a distorted version of the detector-sampled PSF.
crop_psf : bool
If True, when the PSF is rotated to match the detector's rotation in the focal
plane, the PSF will be cropped so the shape of the distorted PSF will match it's
undistorted counterpart. This will only be used for NIRCam, NIRISS, and FGS PSFs.
Keyword Args
------------
return_hdul : bool
Return PSFs in an HDUList rather than set of arrays (default: True).
return_oversample : bool
Returns the oversampled version of the PSF instead of detector-sampled PSF.
Only valid for `reaturn_hdul=False`, otherwise full HDUList returned. Default: True.
"""
calc_psf_func = super().calc_psf
res = _calc_psf_webbpsf(self, calc_psf_func, add_distortion=add_distortion,
fov_pixels=fov_pixels, oversample=oversample, wfe_drift=wfe_drift,
coord_vals=coord_vals, coord_frame=coord_frame, **kwargs)
return res
[docs] def calc_psfs_grid(self, sp=None, wfe_drift=0, osamp=1, npsf_per_full_fov=15,
xsci_vals=None, ysci_vals=None, return_coords=None,
use_coeff=True, **kwargs):
""" PSF grid across an instrumnet FoV
Create a grid of PSFs across instrument aperture FoV. By default,
imaging observations will be for full detector FoV with regularly
spaced grid. Coronagraphic observations will cover nominal
coronagraphic mask region (usually 10s of arcsec) and will have
logarithmically spaced values where appropriate.
Keyword Args
============
sp : :mod:`pysynphot.spectrum`
If not specified, the default is flat in phot lam (equal number of photons
per wavelength bin). The default is normalized to produce 1 count/sec within
that bandpass, assuming the telescope collecting area and instrument bandpass.
Coronagraphic PSFs will further decrease this due to the smaller pupil
size and suppression of coronagraphic mask.
If set, then the resulting PSF image will be scaled to generate the total
observed number of photons from the spectrum (ie., not scaled by unit response).
wfe_drift : float
Desired WFE drift value relative to default OPD.
osamp : int
Sampling of output PSF relative to detector sampling.
npsf_per_full_fov : int
Number of PSFs across one dimension of the instrument's field of
view. If a coronagraphic observation, then this is for the nominal
coronagrahic field of view.
xsci_vals: None or ndarray
Option to pass a custom grid values along x-axis in 'sci' coords.
If coronagraph, this instead corresponds to coronagraphic mask axis,
which has a slight rotation relative to detector axis in MIRI.
ysci_vals: None or ndarray
Option to pass a custom grid values along y-axis in 'sci' coords.
If coronagraph, this instead corresponds to coronagraphic mask axis,
which has a slight rotation relative to detector axis in MIRI.
return_coords : None or str
Option to also return coordinate values in desired frame
('det', 'sci', 'tel', 'idl'). Output is then xvals, yvals, hdul_psfs.
use_coeff : bool
If True, uses `calc_psf_from_coeff`, other WebbPSF's built-in `calc_psf`.
"""
res = _calc_psfs_grid(self, sp=sp, wfe_drift=wfe_drift, osamp=osamp, npsf_per_full_fov=npsf_per_full_fov,
xsci_vals=xsci_vals, ysci_vals=ysci_vals, return_coords=return_coords,
use_coeff=use_coeff, **kwargs)
return res
[docs] def calc_psfs_sgd(self, xoff_asec, yoff_asec, use_coeff=True, **kwargs):
""" Calculate small grid dither PSFs
Convenience function to calculation a series of SGD PSFs. This is
essentially a wrapper around the `calc_psf_from_coeff` and `calc_psf`
functions. Only valid for coronagraphic observations.
Parameters
==========
xoff_asec : float or array-like
Offsets in x-direction (in 'idl' coordinates).
yoff_asec : float or array-like
Offsets in y-direction (in 'idl' coordinates).
use_coeff : bool
If True, uses `calc_psf_from_coeff`, other WebbPSF's built-in `calc_psf`.
"""
res = _calc_psfs_sgd(self, xoff_asec, yoff_asec, use_coeff=use_coeff, **kwargs)
return res
#############################################################
# Functions for use across instrument classes
#############################################################
def _check_list(value, temp_list, var_name=None):
"""
Helper function to test if a value exists within a list.
If not, then raise ValueError exception.
This is mainly used for limiting the allowed values of some variable.
"""
if value not in temp_list:
# Replace None value with string for printing
if None in temp_list:
temp_list[temp_list.index(None)] = 'None'
# Make sure all elements are strings
temp_list2 = [str(val) for val in temp_list]
var_name = '' if var_name is None else var_name + ' '
err_str = "Invalid {}setting: {} \n\tValid values are: {}" \
.format(var_name, value, ', '.join(temp_list2))
raise ValueError(err_str)
def _init_inst(self, filter=None, pupil_mask=None, image_mask=None,
fov_pix=None, oversample=None, **kwargs):
"""
Setup for specific instrument during init state
"""
# Add grisms as pupil options
if self.name=='NIRCam':
self.pupil_mask_list = self.pupil_mask_list + ['GRISMC', 'GRISMR', 'FLAT']
elif self.name=='NIRISS':
self.pupil_mask_list = self.pupil_mask_list + ['GR150C', 'GR150R']
# Option to use `pupil` instead of `pupil_mask`
kw_pupil = kwargs.get('pupil')
kw_mask = kwargs.get('mask')
if (pupil_mask is None) and (kw_pupil is not None) and isinstance(kw_pupil, str):
_log.warn('Deprecation warning: Use `pupil_mask` keyword rather than `pupil` to set pupil mask.')
# pupil_mask = kwargs.get('pupil')
# Option to use `mask` instead of `image_mask`
if (image_mask is None) and (kw_mask is not None):
_log.warn('Deprecation warning: Use `image_mask` keyword rather than `mask` to set image mask.')
# image_mask = kwargs.get('mask')
# Ensure CIRCLYOT or WEDGELYOT in case coron masks were specified
if self.name=='NIRCam' and (pupil_mask is not None) and ('MASK' in pupil_mask):
pupil_mask = 'WEDGELYOT' if ('LWB' in pupil_mask) else 'CIRCLYOT'
if pupil_mask is not None:
self.pupil_mask = pupil_mask
if image_mask is not None:
self.image_mask = image_mask
# Do filter last
if filter is not None:
self.filter = filter
# Don't include SI WFE error for coronagraphy
if self.name=='MIRI':
self.include_si_wfe = False if self.is_coron else True
elif self.name=='NIRCam':
self.include_si_wfe = True
else:
self.include_si_wfe = True
# SIAF aperture attribute
self._siaf_ap = None
# Options to include or exclude distortions
self.include_distortions = True
# Settings for fov_pix and oversample
# Default odd for normal imaging, even for coronagraphy
if fov_pix is None:
# TODO: Do these even/odd settings make sense?
# fov_pix = 128 if self.is_coron else 129
fov_pix = 257
self._fov_pix = fov_pix
self._oversample = oversample
# Legendre polynomials are more stable
# self.use_legendre = True
self.use_legendre = kwargs.get('use_legendre', True)
# Turning on quick perform fits over filter bandpasses independently
# The smaller wavelength range requires fewer monochromaic wavelengths
# and lower order polynomial fits
self._quick = None
self.quick = kwargs.get('quick', self.quick)
# Setting these to None to choose default values at runtime
self._npsf = None
self._ndeg = None
self.npsf = kwargs.get('npsf', self.npsf)
self.ndeg = kwargs.get('ndeg', self.ndeg)
# Set up initial OPD file info
opd_name = 'JWST_OTE_OPD_RevAA_prelaunch_predicted.fits'
try:
opd_name = check_fitsgz(opd_name)
except OSError:
opd_name = f'OPD_RevW_ote_for_{self.name}_predicted.fits'
opd_name = check_fitsgz(opd_name, self.name)
self._opd_default = (opd_name, 0)
self.pupilopd = self._opd_default
# Update telescope pupil and pupil OPD
kw_pupil = kwargs.get('pupil')
kw_pupilopd = kwargs.get('pupilopd')
# if (kw_pupil is not None) or (kw_pupilopd is not None):
# opd_dict = self.get_opd_info(opd=kw_pupilopd, pupil=kw_pupil)
# otelm = opd_dict['pupilopd']
# self.pupilopd = otelm
# self.pupil = otelm
if kw_pupil is not None:
self.pupil = kw_pupil
if kw_pupilopd is not None:
self.pupilopd = kw_pupilopd
# Name to save array of oversampled coefficients
self._save_dir = None
self._save_name = None
# Max FoVs for calculating drift and field-dependent coefficient residuals
# Any pixels beyond this size will be considered to have 0 residual difference
self._fovmax_wfedrift = 256
self._fovmax_wfefield = 128
self._fovmax_wfemask = 256
self.psf_coeff = None
self.psf_coeff_header = None
self._psf_coeff_mod = {
'wfe_drift': None, 'wfe_drift_off': None, 'wfe_drift_lxmap': None,
'si_field': None, 'si_field_v2grid': None, 'si_field_v3grid': None, 'si_field_apname': None,
'si_mask': None, 'si_mask_xgrid': None, 'si_mask_ygrid': None, 'si_mask_apname': None,
'si_mask_large': False
}
if self.image_mask is not None:
self.options['coron_shift_x'] = 0
self.options['coron_shift_y'] = 0
def _gen_save_dir(self):
"""
Generate a default save directory to store PSF coefficients.
If the directory doesn't exist, try to create it.
"""
if self._save_dir is None:
base_dir = Path(conf.WEBBPSF_EXT_PATH) / 'psf_coeffs/'
# Name to save array of oversampled coefficients
inst_name = self.name
save_dir = base_dir / f'{inst_name}/'
else:
save_dir = self._save_dir
if isinstance(save_dir, str):
save_dir = Path(save_dir)
self._save_dir = save_dir
# Create directory (and all intermediates) if it doesn't already exist
if not os.path.isdir(save_dir):
_log.info(f"Creating directory: {save_dir}")
os.makedirs(save_dir, exist_ok=True)
return save_dir
def _clear_coeffs_dir(self):
"""
Remove contents of a instrument coefficient directory.
"""
import shutil
# Should be a pathlib.Path object
save_dir = self.save_dir
if isinstance(save_dir, str):
save_dir = Path(save_dir)
if save_dir.exists() and save_dir.is_dir():
_log.warn(f"Remove contents from '{save_dir}/'?")
_log.warn("Type 'Y' to continue...")
response = input("")
if response=="Y":
# Delete directory and contents
shutil.rmtree(save_dir)
# Recreate empty directory
os.makedirs(save_dir, exist_ok=True)
_log.warn("Directory emptied.")
else:
_log.warn("Process aborted.")
else:
_log.warn(f"Directory '{save_dir}/' does not exist!")
def _gen_save_name(self, wfe_drift=0):
"""
Create save name for polynomial coefficients output file.
"""
# Prepend filter name if using quick keyword
fstr = '{}_'.format(self.filter) if self.quick else ''
# Mask and pupil names
pstr = 'CLEAR' if self.pupil_mask is None else self.pupil_mask
mstr = 'NONE' if self.image_mask is None else self.image_mask
## 9/14/2022 - PSF weighting for substrate and ND mask should not be necessary
## since these are included in bandpass throughputs, which are then
## applied to input spectrum to get flux-dependent PSFs. Therefore, the
## saved PSF coefficients are similar for all three scenario:
## 1) coron_substrate=False; 2) coron_substrate=True; 3) ND_acq=True
# Check for coron substrate if image mask is None
# if (mstr == 'NONE') and self.coron_substrate:
# mstr = 'CORONSUB'
# Only need coron substrate for PSF weighting
# if (mstr == 'NONE'):
# if self.ND_acq:
# mstr = 'NDACQ'
# elif self.coron_substrate:
# mstr = 'CORONSUB'
fmp_str = f'{fstr}{pstr}_{mstr}'
# PSF image size and sampling
fov_pix = self.fov_pix
osamp = self.oversample
if self.name=='NIRCam':
# Prepend module and channel to filter/pupil/mask
module = self.module
chan_str = 'LW' if 'long' in self.channel else 'SW'
fmp_str = f'{chan_str}{module}_{fmp_str}'
# Set bar offset if specified
# bar_offset = self.options.get('bar_offset', None)
bar_offset = self.get_bar_offset()
bar_str = '' if bar_offset is None else '_bar{:.2f}'.format(bar_offset)
else:
bar_str = ''
# Jitter settings
jitter = self.options.get('jitter')
jitter_sigma = self.options.get('jitter_sigma', 0)
if (jitter is None) or (jitter_sigma is None):
jitter_sigma = 0
jsig_mas = jitter_sigma*1000
# Source positioning
offset_r = self.options.get('source_offset_r', 0)
offset_theta = self.options.get('source_offset_theta', 0)
if offset_r is None:
offset_r = 0
if offset_theta is None:
offset_theta = 0
rth_str = f'r{offset_r:.2f}_th{offset_theta:+.1f}'
# Mask offsetting
coron_shift_x = self.options.get('coron_shift_x', 0)
coron_shift_y = self.options.get('coron_shift_y', 0)
if coron_shift_x is None:
coron_shift_x = 0
if coron_shift_y is None:
coron_shift_y = 0
moff_str1 = '' if coron_shift_x==0 else f'_mx{coron_shift_x:.3f}'
moff_str2 = '' if coron_shift_y==0 else f'_my{coron_shift_y:.3f}'
moff_str = moff_str1 + moff_str2
opd_dict = self.get_opd_info()
opd_str = opd_dict['opd_str']
if wfe_drift!=0:
opd_str = '{}-{:.0f}nm'.format(opd_str,wfe_drift)
fname = f'{fmp_str}_pix{fov_pix}_os{osamp}_jsig{jsig_mas:.0f}_{rth_str}{moff_str}{bar_str}_{opd_str}'
# Add SI WFE tag if included
if self.include_si_wfe:
fname = fname + '_siwfe'
# Add distortions tag if included
if self.include_distortions:
fname = fname + '_distort'
if self.use_legendre:
fname = fname + '_legendre'
fname = fname + '.fits'
return fname
def _get_opd_info(self, opd=None, pupil=None, HDUL_to_OTELM=True):
"""
Parse out OPD information for a given OPD, which can be a
file name, tuple (file,slice), HDUList, or OTE Linear Model.
Returns dictionary of some relevant information for logging purposes.
The dictionary has an OPD version as an OTE_Linear_Model_WSS object.
This outputs an OTE Linear Model.
In order to update instrument class:
>>> opd_dict = inst.get_opd_info()
>>> opd_new = opd_dict['pupilopd']
>>> inst.pupilopd = opd_new
>>> inst.pupil = opd_new
"""
# Pupil OPD file name
if opd is None:
opd = self.pupilopd
if pupil is None:
pupil = self.pupil
# If OPD is None or a string, make into tuple
if opd is None: # Default OPD
opd = self._opd_default
elif isinstance(opd, six.string_types):
opd = (opd, 0)
# Change log levels to WARNING
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
# Parse OPD info
if isinstance(opd, tuple):
if not len(opd)==2:
raise ValueError("opd passed as tuple must have length of 2.")
# Filename info
opd_name = opd[0] # OPD file name
opd_num = opd[1] # OPD slice
rev = [s for s in opd_name.split('_') if "Rev" in s]
rev = '' if len(rev)==0 else rev[0]
opd_str = '{}slice{:.0f}'.format(rev,opd_num)
opd = OPDFile_to_HDUList(opd_name, opd_num)
elif isinstance(opd, fits.HDUList):
# A custom OPD is passed.
opd_name = 'OPD from FITS HDUlist'
opd_num = 0
opd_str = 'OPDcustomFITS'
elif isinstance(opd, poppy.OpticalElement):
# OTE Linear Model
# opd_name = 'OPD from OTE LM'
opd_name = opd.name
opd_num = 0
opd_str = 'OPDcustomLM'
else:
raise ValueError("OPD must be a string, tuple, HDUList, or OTE LM.")
# OPD should now be an HDUList or OTE LM
# Convert to OTE LM if HDUList
if HDUL_to_OTELM and isinstance(opd, fits.HDUList):
hdul = opd
header = hdul[0].header
header['ORIGINAL'] = (opd_name, "Original OPD source")
header['SLICE'] = (opd_num, "Slice index of original OPD")
#header['WFEDRIFT'] = (self.wfe_drift, "WFE drift amount [nm]")
name = 'Modified from ' + opd_name
opd = OTE_Linear_Model_WSS(name=name, transmission=pupil,
opd=hdul, opd_index=opd_num,
v2v3=self._tel_coords())
setup_logging(log_prev, verbose=False)
out_dict = {'opd_name':opd_name, 'opd_num':opd_num, 'opd_str':opd_str, 'pupilopd':opd}
return out_dict
def _drift_opd(self, wfe_drift, opd=None, wfe_therm=None, wfe_frill=None, wfe_iec=None):
"""
A quick method to drift the pupil OPD. This function applies
some WFE drift to input OPD file by breaking up the wfe_drift
parameter into thermal, frill, and IEC components. If we want
more realistic time evolution, then we should use the procedure
in dev_utils/WebbPSF_OTE_LM.ipynb to create a time series of OPD
maps, which can then be passed directly to create unique PSFs.
This outputs an OTE Linear Model. In order to update instrument class:
>>> opd_dict = inst.drift_opd()
>>> inst.pupilopd = opd_dict['opd']
>>> inst.pupil = opd_dict['opd']
Parameters
----------
wfe_drift : float
Desired WFE drift (delta RMS) in nm.
opd : Various
file name, tuple (file,slice), HDUList, or OTE Linear Model
of the OPD map.
wfe_therm : None or float
Option to specify thermal component of WFE drift (nm RMS).
`wfe_drift` is ignored.
wfe_frill : None or float
Option to specify frill component of WFE drift (nm RMS).
`wfe_drift` is ignored.
wfe_iec : None or float
Option to specify IEC component of WFE drift (nm RMS).
`wfe_drift` is ignored.
"""
# Get Pupil OPD info and convert to OTE LM
opd_dict = self.get_opd_info(opd)
opd_name = opd_dict['opd_name']
opd_num = opd_dict['opd_num']
opd_str = opd_dict['opd_str']
opd = deepcopy(opd_dict['pupilopd'])
# Apply drift components
wfe_dict = {'therm':0, 'frill':0, 'iec':0, 'opd':opd}
if (wfe_therm is not None) or (wfe_frill is not None) or (wfe_iec is not None):
wfe_therm = 0 if wfe_therm is None else wfe_therm
wfe_frill = 0 if wfe_frill is None else wfe_frill
wfe_iec = 0 if wfe_iec is None else wfe_iec
# Apply IEC
opd.apply_iec_drift(amplitude=wfe_iec, delay_update=True)
# Apply frill
opd.apply_frill_drift(amplitude=wfe_frill, delay_update=True)
# Apply OTE thermal slew amplitude
# This is slightly different due to how thermal slews are specified
delta_time = 14*24*60 * u.min
wfe_scale = (wfe_therm / 24)
if wfe_scale == 0:
delta_time = 0
opd.thermal_slew(delta_time, case='BOL', scaling=wfe_scale)
wfe_dict['therm'] = wfe_therm
wfe_dict['frill'] = wfe_frill
wfe_dict['iec'] = wfe_iec
wfe_dict['opd'] = opd
elif (wfe_drift != 0):
_log.info('Performing WFE drift of {}nm'.format(wfe_drift))
# Apply WFE drift to OTE Linear Model (Amplitude of frill drift)
# self.pupilopd = opd
# self.pupil = opd
# Split WFE drift amplitude between three processes
# 1) IEC Heaters; 2) Frill tensioning; 3) OTE Thermal perturbations
# Give IEC heaters 1 nm
wfe_iec = 1 if np.abs(wfe_drift) > 2 else 0
# Split remainder between frill and OTE thermal slew
wfe_remain_var = wfe_drift**2 - wfe_iec**2
wfe_frill = np.sqrt(0.8*wfe_remain_var)
wfe_therm = np.sqrt(0.2*wfe_remain_var)
# wfe_th_frill = np.sqrt((wfe_drift**2 - wfe_iec**2) / 2)
# Negate amplitude if supplying negative wfe_drift
if wfe_drift < 0:
wfe_frill *= -1
wfe_therm *= -1
wfe_iec *= -1
# Apply IEC
opd.apply_iec_drift(amplitude=wfe_iec, delay_update=True)
# Apply frill
opd.apply_frill_drift(amplitude=wfe_frill, delay_update=True)
# Apply OTE thermal slew amplitude
# This is slightly different due to how thermal slews are specified
delta_time = 14*24*60 * u.min
wfe_scale = (wfe_therm / 24)
if wfe_scale == 0:
delta_time = 0
opd.thermal_slew(delta_time, case='BOL', scaling=wfe_scale)
wfe_dict['therm'] = wfe_therm
wfe_dict['frill'] = wfe_frill
wfe_dict['iec'] = wfe_iec
wfe_dict['opd'] = opd
else: # No drift
# Apply IEC
opd.apply_iec_drift(amplitude=0, delay_update=True)
# Apply frill
opd.apply_frill_drift(amplitude=0, delay_update=True)
# Apply OTE thermal slew amplitude
opd.thermal_slew(0*u.min, scaling=0)
wfe_dict['opd'] = opd
return wfe_dict
def _update_mask_shifts(self):
"""
Restrict mask offsets to whole pixel shifts. Update source_offset_r/theta
to accommodate sub-pixel shifts. Save sub-pixel shift values to temporary
source_offset_xsub/ysub keys in the options dict.
"""
# Restrict mask offsets to whole pixel shifts
# Subpixel offsets should be handled with source offsets
xv = self.options.get('coron_shift_x', 0)
yv = self.options.get('coron_shift_y', 0)
if (not self.is_coron) or (xv==yv==0):
return
# Whole pixel offsets
pixscale = self.pixelscale
xv_pix = int(xv / pixscale) * pixscale
yv_pix = int(yv / pixscale) * pixscale
self.options['coron_shift_x'] = xv_pix # arcsec
self.options['coron_shift_y'] = yv_pix # arcsec
# Subpixel residuals
xv_subpix = xv - xv_pix
yv_subpix = yv - yv_pix
rotation = 0 if self._rotation is None else -1*self._rotation
# Equivalent source offsetting
xoff_sub, yoff_sub = xy_rot(-1*xv_subpix, -1*yv_subpix, rotation)
# Get initial values if they exist
r0 = self.options.get('source_offset_r', 0)
th0 = self.options.get('source_offset_theta', 0)
x0, y0 = rtheta_to_xy(r0, th0)
# Update (r, th) offsets
r, th = xy_to_rtheta(x0+xoff_sub, y0+yoff_sub)
self.options['source_offset_r'] = r
self.options['source_offset_theta'] = th
# print(xv, yv)
# print('coron_shift_x: ', xv_pix, 'coron_shift_y: ', yv_pix)
# print(xv_subpix, yv_subpix, rotation)
# print(xoff_sub, yoff_sub)
# print(r0, th0, x0, y0)
# print('source_offset_r: ', r, 'source_offset_theta: ', th)
# Store the xsub and ysub to back out later
self.options['source_offset_xsub'] = xoff_sub # arcsec
self.options['source_offset_ysub'] = yoff_sub # arcsec
def _calc_psf_with_shifts(self, calc_psf_func, **kwargs):
"""
Mask shifting in webbpsf does not have as high of a precision as source offsetting
for a given oversample setting.
When performing mask shifting, coron_shift_x/y should be detector pixel integers.
Sub-pixel shifting should occur using source_offset_r/theta, but we must shift
the final images in the opposite direction in order to reposition the PSF at the
proper source location. If the user already set _r/theta, the PSF is shifted to
that position using temporary source_offset_xsub/ysub keys in the options dict.
"""
from .utils import S
# Only shift coronagraphic masks by pixel integers
# sub-pixels shifts are handled by source offsetting
if self.is_coron:
options_orig = self.options.copy()
_update_mask_shifts(self)
# Get spectrum; flat in photlam if not specified
sp = kwargs.pop('sp', None)
if sp is None:
sp = stellar_spectrum('flat')
do_counts = False
else:
do_counts = True
# Create source weights
bp = self.bandpass
obs = S.Observation(sp, bp, binset=bp.wave)
obs.convert('photlam')
# Decide on wavelengths and weights
npsf = self.npsf
wave_um = np.linspace(obs.binwave.min(), obs.binwave.max(), npsf) / 1e4
weights = np.interp(wave_um, obs.binwave/1e4, obs.binflux)
weights /= weights.sum()
src = {'wavelengths': wave_um*1e-6, 'weights': weights}
# NIRCam grism pupils aren't recognized by WebbPSF
if (self.name.upper()=='NIRCAM') and self.is_grism:
grism_temp = self.pupil_mask
self.pupil_mask = None
# Perform PSF calculation
kw_list = [
'nlambda', 'monochromatic',
'fov_arcsec', 'fov_pixels', 'oversample',
'detector_oversample', 'fft_oversample',
'outfile', 'overwrite', 'display',
'save_intermediates', 'return_intermediates',
'normalize', 'add_distortion', 'crop_psf'
]
kwargs2 = {}
for kw in kw_list:
if kw in kwargs.keys(): kwargs2[kw] = kwargs[kw]
hdul = calc_psf_func(source=src, **kwargs2)
# Return grism
if (self.name.upper()=='NIRCAM') and self.is_grism:
self.pupil_mask = grism_temp
# Specify image oversampling relative to detector sampling
for hdu in hdul:
if 'DET' in hdu.header['EXTNAME']:
osamp = 1
else:
osamp = hdu.header['DET_SAMP']
hdu.header['OSAMP'] = (osamp, 'Image oversample vs det')
# Scale PSF by total incident source flux
if do_counts:
for hdu in hdul:
hdu.data *= obs.countrate()
# Perform sub-pixel shifting to reposition PSF at requested source location
if self.is_coron:
for hdu in hdul:
pix_scale = hdu.header['PIXELSCL']
xoff_pix = self.options.get('source_offset_xsub',0) / pix_scale
yoff_pix = self.options.get('source_offset_ysub',0) / pix_scale
# hdu.data = fshift(hdu.data, -1*xoff_pix, -1*yoff_pix)
hdu.data = fourier_imshift(hdu.data, -1*xoff_pix, -1*yoff_pix)
# Return to previous values
self.options = options_orig
return hdul
def _calc_psf_webbpsf(self, calc_psf_func, add_distortion=None, fov_pixels=None, oversample=None,
wfe_drift=None, coord_vals=None, coord_frame='tel', **kwargs):
""" Compute a WebbPSF PSF
Slight modification of inherent WebbPSF `calc_psf` function. If add_distortion, fov_pixels,
and oversample are not specified, then we automatically use the associated attributes.
"""
# Automatically use already defined properties
add_distortion = self.include_distortions if add_distortion is None else add_distortion
fov_pixels = self.fov_pix if fov_pixels is None else fov_pixels
kwargs['add_distortion'] = add_distortion
kwargs['fov_pixels'] = fov_pixels
# If there are distortion, compute some fov_pixels, which will get cropped later
npix_extra = 5
if add_distortion:
kwargs['fov_pixels'] += 2 * npix_extra
# Figure out sampling (always want >=4 for Lyot/coronagraphy)
try: # is_lyot may not always be a valid attribute
is_lyot = self.is_lyot
except AttributeError:
is_lyot = False
try: # is_coron may not always be a valid attribute
is_coron = self.is_coron
except AttributeError:
is_coron = False
if is_lyot or is_coron:
if oversample is None:
if self.oversample>=4: # we're good!
oversample = self.oversample
else: # different oversample and detector_oversample
_log.info(f'For coronagraphy, setting oversample=4 and detector_oversample={oversample}')
kwargs['detector_oversample'] = self.oversample
oversample = 4
elif oversample<4: # no changes, but send informational message
_log.warn('oversample={oversample} may produce imprecise results for coronagraphy. Suggest >=4.')
else:
oversample = self.oversample if oversample is None else oversample
# Note: output image oversampling is overriden by 'detector_oversample'
# This simply specifies FFT ovesampling
kwargs['oversample'] = oversample
# Drift OPD
wfe_drift = 0 if wfe_drift is None else wfe_drift
if wfe_drift != 0:
# Create copies
pupilopd_orig = deepcopy(self.pupilopd)
pupil_orig = deepcopy(self.pupil)
# Get OPD info and convert to OTE LM
opd_dict = self.get_opd_info(HDUL_to_OTELM=True)
opd = opd_dict['pupilopd']
# Perform OPD drift and store in pupilopd and pupil attributes
wfe_dict = self.drift_opd(wfe_drift, opd=opd)
self.pupilopd = wfe_dict['opd']
self.pupil = wfe_dict['opd']
# Get new sci coord
if coord_vals is not None:
siaf_ap = self.siaf[self.aperturename]
xorig, yorig = self.detector_position
xnew, ynew = coord_vals
coron_shift_x_orig = self.options.get('coron_shift_x', 0)
coron_shift_y_orig = self.options.get('coron_shift_y', 0)
bar_offset_orig = self.options.get('bar_offset', None)
self.options['bar_offset'] = 0
# bar_offset_orig = self.options.get('b')
xidl, yidl = siaf_ap.convert(xnew, ynew, coord_frame, 'idl')
xsci, ysci = siaf_ap.convert(xnew, ynew, coord_frame, 'sci')
self.detector_position = (xsci, ysci)
# For coronagraphy, perform mask shift
if self.is_coron:
# Mask shift information
field_rot = 0 if self._rotation is None else self._rotation
# Convert to mask shifts (arcsec)
# xoff_asec = (xsci - siaf_ap.XSciRef) * siaf_ap.XSciScale # asec offset from reference
# yoff_asec = (ysci - siaf_ap.YSciRef) * siaf_ap.YSciScale # asec offset from reference
xoff_asec, yoff_asec = (xidl, yidl)
xoff_mask, yoff_mask = xy_rot(-1*xoff_asec, -1*yoff_asec, field_rot)
# print(xnew, ynew, coord_frame)
# print(xsci, ysci, siaf_ap.AperName)
# print(siaf_ap.XSciRef, siaf_ap.YSciRef)
# print(xoff_asec, yoff_asec)
# print(xoff_mask, yoff_mask)
# Shift mask in opposite direction
self.options['coron_shift_x'] = xoff_mask
self.options['coron_shift_y'] = yoff_mask
# Perform PSF calculation
return_hdul = kwargs.pop('return_hdul', True)
return_oversample = kwargs.pop('return_oversample', True)
hdul = _calc_psf_with_shifts(self, calc_psf_func, **kwargs)
# Reset pupil and OPD
if wfe_drift != 0:
self.pupilopd = pupilopd_orig
self.pupil = pupil_orig
# Return options
if coord_vals is not None:
self.detector_position = (xorig, yorig)
self.options['coron_shift_x'] = coron_shift_x_orig
self.options['coron_shift_y'] = coron_shift_y_orig
self.options['bar_offset'] = bar_offset_orig
# Crop distorted borders
if add_distortion:
osamp = hdul[0].header['DET_SAMP']
npix_over = npix_extra * osamp
hdul[0].data = hdul[0].data[npix_over:-npix_over,npix_over:-npix_over]
hdul[2].data = hdul[2].data[npix_over:-npix_over,npix_over:-npix_over]
hdul[1].data = hdul[1].data[npix_extra:-npix_extra,npix_extra:-npix_extra]
hdul[3].data = hdul[3].data[npix_extra:-npix_extra,npix_extra:-npix_extra]
# Check if we set return_hdul=False
if return_hdul:
res = hdul
else:
# If just returning a single image, determine oversample and distortion
res = hdul[2].data if add_distortion else hdul[0].data
if not return_oversample:
res = frebin(res, scale=1/self.oversample)
return res
def _inst_copy(self):
""" Return a copy of the current instrument class. """
# Change log levels to WARNING for webbpsf_ext, WebbPSF, and POPPY
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
init_params = {
'filter' : self.filter,
'pupil_mask': self.pupil_mask,
'image_mask': self.image_mask,
'fov_pix' : self.fov_pix,
'oversample': self.oversample,
'auto_gen_coeffs': False
}
# Init same subclass
if self.name=='NIRCam':
inst = NIRCam_ext(**init_params)
elif self.name=='MIRI':
inst = MIRI_ext(**init_params)
# Get OPD info
inst.pupilopd = deepcopy(self.pupilopd)
inst.pupil = deepcopy(self.pupil)
# Detector and aperture info
inst._detector = self._detector
inst._detector_position = self._detector_position
inst._aperturename = self._aperturename
# Other options
inst.options = self.options
# PSF coeff info
inst.use_legendre = self.use_legendre
inst._ndeg = self._ndeg
inst._npsf = self._npsf
inst._quick = self._quick
# SI WFE and distortions
inst.include_si_wfe = self.include_si_wfe
inst.include_distortions = self.include_distortions
### Instrument-specific parameters
# Grism order for NIRCam
try: inst._grism_order = self._grism_order
except: pass
# ND square for NIRCam
try: inst._ND_acq = self._ND_acq
except: pass
setup_logging(log_prev, verbose=False)
return inst
def _wrap_coeff_for_mp(args):
"""
Internal helper routine for parallelizing computations across multiple processors
for multiple WebbPSF monochromatic calculations.
args => (inst,w,fov_pix,oversample)
"""
# No multiprocessing for monochromatic wavelengths
mp_prev = poppy.conf.use_multiprocessing
poppy.conf.use_multiprocessing = False
inst, w = args
try:
hdu_list = inst.calc_psf(monochromatic=w*1e-6, crop_psf=True)
except Exception as e:
print('Caught exception in worker thread (w = {}):'.format(w))
# This prints the type, value, and stack trace of the
# current exception being handled.
traceback.print_exc()
print('')
#raise e
poppy.conf.use_multiprocessing = mp_prev
return None
# Return to previous setting
poppy.conf.use_multiprocessing = mp_prev
# Return distorted PSF
if inst.include_distortions:
hdu = hdu_list[2]
else:
hdu = hdu_list[0]
# Rather than storing
hdu.header['OSAMP'] = (inst.oversample, 'Image oversample vs det')
return hdu
def _gen_psf_coeff(self, nproc=None, wfe_drift=0, force=False, save=True,
return_results=False, return_extras=False, **kwargs):
"""Generate PSF coefficients
Creates a set of coefficients that will generate simulated PSFs for any
arbitrary wavelength. This function first simulates a number of evenly-
spaced PSFs throughout the specified bandpass (or the full channel).
An nth-degree polynomial is then fit to each oversampled pixel using
a linear-least squares fitting routine. The final set of coefficients
for each pixel is returned as an image cube. The returned set of
coefficient are then used to produce PSF via `calc_psf_from_coeff`.
Useful for quickly generated imaging and dispersed PSFs for multiple
spectral types.
Parameters
----------
nproc : bool or None
Manual setting of number of processor cores to break up PSF calculation.
If set to None, this is determined based on the requested PSF size,
number of available memory, and hardware processor cores. The automatic
calculation endeavors to leave a number of resources available to the
user so as to not crash the user's machine.
wfe_drift : float
Wavefront error drift amplitude in nm.
force : bool
Forces a recalculation of PSF even if saved PSF exists. (default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
return_results : bool
By default, results are saved as object the attributes `psf_coeff` and
`psf_coeff_header`. If return_results=True, results are instead returned
as function outputs and will not be saved to the attributes. This is mostly
used for successive coeff simulations to determine varying WFE drift or
focal plane dependencies.
return_extras : bool
Additionally returns a dictionary of monochromatic PSFs images and their
corresponding wavelengths for debugging purposes. Can be used with or without
`return_results`. If `return_results=False`, then only this dictionary is
returned, otherwise if `return_results=False` then returns everything as a
3-element tuple (psf_coeff, psf_coeff_header, extras_dict).
"""
save_name = self.save_name
outfile = str(self.save_dir / save_name)
# Load data from already saved FITS file
if os.path.exists(outfile) and (not force):
if return_extras:
_log.warn("return_extras only valid if coefficient files does not exist or force=True")
_log.info(f'Loading {outfile}')
hdul = fits.open(outfile)
data = hdul[0].data.astype(np.float)
hdr = hdul[0].header
hdul.close()
# Output if return_results=True, otherwise save to attributes
if return_results:
return data, hdr
else:
del self.psf_coeff, self.psf_coeff_header
self.psf_coeff = data
self.psf_coeff_header = hdr
return
temp_str = 'and saving' if save else 'but not saving'
_log.info(f'Generating {temp_str} PSF coefficient')
# Change log levels to WARNING for webbpsf_ext, WebbPSF, and POPPY
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
# w1 = self.bandpass.wave.min() / 1e4
# w2 = self.bandpass.wave.max() / 1e4
w1, w2 = self.wave_fit
npsf = self.npsf
waves = np.linspace(w1, w2, npsf)
fov_pix = self.fov_pix
oversample = self.oversample
# Get OPD info and convert to OTE LM
opd_dict = self.get_opd_info(HDUL_to_OTELM=True)
opd_name = opd_dict['opd_name']
opd_num = opd_dict['opd_num']
opd_str = opd_dict['opd_str']
opd = opd_dict['pupilopd']
# Drift OPD
if wfe_drift!=0:
wfe_dict = self.drift_opd(wfe_drift, opd=opd)
else:
wfe_dict = {'therm':0, 'frill':0, 'iec':0, 'opd':opd}
opd_new = wfe_dict['opd']
# Save copies
pupilopd_orig = deepcopy(self.pupilopd)
pupil_orig = deepcopy(self.pupil)
self.pupilopd = opd_new
self.pupil = opd_new
# How many processors to split into?
if nproc is None:
nproc = nproc_use(fov_pix, oversample, npsf)
_log.debug('nprocessors: {}; npsf: {}'.format(nproc, npsf))
# Make a paired down copy of self with limited data for
# copying to multiprocessor theads. This reduces memory
# swapping overheads and limitations.
inst_copy = _inst_copy(self) if nproc > 1 else self
t0 = time.time()
# Setup the multiprocessing pool and arguments to pass to each pool
worker_arguments = [(inst_copy, wlen) for wlen in waves]
if nproc > 1:
hdu_arr = []
try:
with mp.Pool(nproc) as pool:
for res in tqdm(pool.imap(_wrap_coeff_for_mp, worker_arguments),
total=npsf, desc='Monochromatic PSFs', leave=False):
hdu_arr.append(res)
pool.close()
if hdu_arr[0] is None:
raise RuntimeError('Returned None values. Issue with multiprocess or WebbPSF??')
except Exception as e:
_log.error('Caught an exception during multiprocess.')
_log.info('Closing multiprocess pool.')
pool.terminate()
pool.close()
raise e
else:
_log.info('Closing multiprocess pool.')
else:
# Pass arguments to the helper function
hdu_arr = []
for wa in tqdm(worker_arguments, desc='Monochromatic PSFs', leave=False):
hdu = _wrap_coeff_for_mp(wa)
if hdu is None:
raise RuntimeError('Returned None values. Issue with WebbPSF??')
hdu_arr.append(hdu)
del inst_copy, worker_arguments
t1 = time.time()
# Ensure PSF sum is not larger than 1.0
# This can sometimes occur for distorted PSFs near edges
for hdu in hdu_arr:
data_sum = hdu.data.sum()
# print(data_sum)
if data_sum>1:
hdu.data /= data_sum
# Reset pupils
self.pupilopd = pupilopd_orig
self.pupil = pupil_orig
# Reset to original log levels
setup_logging(log_prev, verbose=False)
time_string = 'Took {:.2f} seconds to generate WebbPSF images'.format(t1-t0)
_log.info(time_string)
# Extract image data from HDU array
images = []
for hdu in hdu_arr:
images.append(hdu.data)
# Turn results into a numpy array (npsf,ny,nx)
images = np.asarray(images)
# Simultaneous polynomial fits to all pixels using linear least squares
use_legendre = self.use_legendre
ndeg = self.ndeg
coeff_all = jl_poly_fit(waves, images, deg=ndeg, use_legendre=use_legendre, lxmap=[w1,w2])
################################
# Create HDU and header
################################
hdu = fits.PrimaryHDU(coeff_all)
hdr = hdu.header
head_temp = hdu_arr[0].header
hdr['DESCR'] = ('PSF Coeffecients', 'File Description')
hdr['NWAVES'] = (npsf, 'Number of wavelengths used in calculation')
copy_keys = [
'EXTNAME', 'OSAMP', 'OVERSAMP', 'DET_SAMP', 'PIXELSCL', 'FOV',
'INSTRUME', 'FILTER', 'PUPIL', 'CORONMSK',
'WAVELEN', 'DIFFLMT', 'APERNAME', 'MODULE', 'CHANNEL', 'PILIN',
'DET_NAME', 'DET_X', 'DET_Y', 'DET_V2', 'DET_V3',
'GRATNG14', 'GRATNG23', 'FLATTYPE', 'CCCSTATE', 'TACQNAME',
'PUPILINT', 'PUPILOPD', 'OPD_FILE', 'OPDSLICE', 'TEL_WFE',
'SI_WFE', 'SIWFETYP', 'SIWFEFPT',
'ROTATION', 'DISTORT', 'SIAF_VER', 'MIR_DIST', 'KERN_AMP', 'KERNFOLD',
'NORMALIZ', 'FFTTYPE', 'AUTHOR', 'DATE', 'VERSION', 'DATAVERS'
]
for key in copy_keys:
try:
hdr[key] = (head_temp[key], head_temp.comments[key])
except (AttributeError, KeyError):
pass
# hdr[key] = ('none', 'No key found')
hdr['WEXTVERS'] = (__version__, "webbpsf_ext version")
# Update keywords
hdr['PUPILOPD'] = (opd_name, 'Original Pupil OPD source')
hdr['OPDSLICE'] = (opd_num, 'OPD slice index')
# Source positioning
offset_r = self.options.get('source_offset_r', 'None')
offset_theta = self.options.get('source_offset_theta', 'None')
# Mask offsetting
coron_shift_x = self.options.get('coron_shift_x', 'None')
coron_shift_y = self.options.get('coron_shift_y', 'None')
bar_offset = self.options.get('bar_offset', 'None')
# Jitter settings
jitter = self.options.get('jitter')
jitter_sigma = self.options.get('jitter_sigma', 0)
# gen_psf_coeff() Keyword Values
hdr['FOVPIX'] = (fov_pix, 'WebbPSF pixel FoV')
hdr['NPSF'] = (npsf, 'Number of wavelengths to calc')
hdr['NDEG'] = (ndeg, 'Polynomial fit degree')
hdr['WAVE1'] = (w1, 'First wavelength in calc')
hdr['WAVE2'] = (w2, 'Last of wavelength in calc')
hdr['LEGNDR'] = (use_legendre, 'Legendre polynomial fit?')
hdr['OFFR'] = (offset_r, 'Radial offset')
hdr['OFFTH'] = (offset_theta, 'Position angle OFFR (CCW)')
if (self.image_mask is not None) and ('WB' in self.image_mask):
hdr['BAROFF'] = (bar_offset, 'Image mask shift along wedge (arcsec)')
hdr['MASKOFFX'] = (coron_shift_x, 'Image mask shift in x (arcsec)')
hdr['MASKOFFY'] = (coron_shift_y, 'Image mask shift in y (arcsec)')
if jitter is None:
hdr['JITRTYPE'] = ('None', 'Type of jitter applied')
else:
hdr['JITRTYPE'] = (jitter, 'Type of jitter applied')
hdr['JITRSIGM'] = (jitter_sigma, 'Jitter sigma [mas]')
if opd is None:
hdr['OPD'] = ('None', 'Telescope OPD')
elif isinstance(opd, fits.HDUList):
hdr['OPD'] = ('HDUList', 'Telescope OPD')
elif isinstance(opd, six.string_types):
hdr['OPD'] = (opd, 'Telescope OPD')
elif isinstance(opd, poppy.OpticalElement):
hdr['OPD'] = ('OTE Linear Model', 'Telescope OPD')
else:
hdr['OPD'] = ('UNKNOWN', 'Telescope OPD')
hdr['WFEDRIFT'] = (wfe_drift, "WFE drift amount [nm]")
hdr['OTETHMDL'] = (opd._thermal_model.case, "OTE Thermal slew model case")
hdr['OTETHSTA'] = ("None", "OTE Starting pitch angle for thermal slew model")
hdr['OTETHEND'] = ("None", "OTE Ending pitch angle for thermal slew model")
hdr['OTETHRDT'] = ("None", "OTE Thermal slew model delta time after slew")
hdr['OTETHRWF'] = (wfe_dict['therm'], "OTE WFE amplitude from 'thermal slew' term")
hdr['OTEFRLWF'] = (wfe_dict['frill'], "OTE WFE amplitude from 'frill tension' term")
hdr['OTEIECWF'] = (wfe_dict['iec'], "OTE WFE amplitude from 'IEC thermal cycling'")
hdr['SIWFE'] = (self.include_si_wfe, "Was SI WFE included?")
hdr['FORCE'] = (force, "Forced calculations?")
hdr['SAVE'] = (save, "Was file saved to disk?")
hdr['FILENAME'] = (save_name, "File save name")
hdr.insert('WEXTVERS', '', after=True)
hdr.insert('WEXTVERS', ('','gen_psf_coeff() Parameters'), after=True)
hdr.insert('WEXTVERS', '', after=True)
hdr.add_history(time_string)
if save:
# Catch warnings in case header comments too long
from astropy.utils.exceptions import AstropyWarning
import warnings
_log.info(f'Saving to {outfile}')
with warnings.catch_warnings():
warnings.simplefilter('ignore', AstropyWarning)
hdu.writeto(outfile, overwrite=True)
if return_results==False:
del self.psf_coeff, self.psf_coeff_header
self.psf_coeff = coeff_all
self.psf_coeff_header = hdr
extras_dict = {'images' : images, 'waves': waves}
# Options to return results from function
if return_results:
if return_extras:
return coeff_all, hdr, extras_dict
else:
return coeff_all, hdr
elif return_extras:
return extras_dict
else:
return
def _gen_wfedrift_coeff(self, force=False, save=True, wfe_list=[0,1,2,5,10,20,40],
return_results=False, return_raw=False, **kwargs):
""" Fit WFE drift coefficients
This function finds a relationship between PSF coefficients in the
presence of WFE drift. For a series of WFE drift values, we generate
corresponding PSF coefficients and fit a polynomial relationship to
the residual values. This allows us to quickly modify a nominal set of
PSF image coefficients to generate a new PSF where the WFE has drifted
by some amplitude.
It's Legendre's all the way down...
Parameters
----------
force : bool
Forces a recalculation of coefficients even if saved file exists.
(default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
Keyword Args
------------
wfe_list : array-like
A list of wavefront error drift values (nm) to calculate and fit.
Default is [0,1,2,5,10,20,40], which covers the most-likely
scenarios (1-5nm) while also covering a range of extreme drift
values (10-40nm).
return_results : bool
By default, results are saved in `self._psf_coeff_mod` dictionary.
If return_results=True, results are instead returned as function outputs
and will not be saved to the dictionary attributes.
return_raw : bool
Normally, we return the relation between PSF coefficients as a function
of position. Instead this returns (as function outputs) the raw values
prior to fitting. Final results will not be saved to the dictionary attributes.
Example
-------
Generate PSF coefficient, WFE drift modifications, then
create an undrifted and drifted PSF. (pseudo-code)
>>> fpix, osamp = (128, 4)
>>> coeff0 = gen_psf_coeff()
>>> wfe_cf = gen_wfedrift_coeff()
>>> psf0 = gen_image_from_coeff(coeff=coeff0)
>>> # Drift the coefficients
>>> wfe_drift = 5 # nm
>>> cf_fit = wfe_cf.reshape([wfe_cf.shape[0], -1])
>>> cf_mod = jl_poly(np.array([wfe_drift]), cf_fit).reshape(coeff0.shape)
>>> coeff5nm = coeff + cf_mod
>>> psf5nm = gen_image_from_coeff(coeff=coeff5nm)
"""
# fov_pix should not be more than some size to preserve memory
fov_max = self._fovmax_wfedrift if self.oversample<=4 else self._fovmax_wfedrift / 2
fov_pix_orig = self.fov_pix
if self.fov_pix>fov_max:
self.fov_pix = fov_max if (self.fov_pix % 2 == 0) else fov_max + 1
# Name to save array of oversampled coefficients
save_dir = self.save_dir
save_name = os.path.splitext(self.save_name)[0] + '_wfedrift.npz'
outname = str(save_dir / save_name)
# Load file if it already exists
if (not force) and os.path.exists(outname):
# Return fov_pix to original size
self.fov_pix = fov_pix_orig
_log.info(f"Loading {outname}")
out = np.load(outname)
wfe_drift = out.get('wfe_drift')
# Account for possibility that wfe_drift_off is None
try:
wfe_drift_off = out.get('wfe_drift_off')
except ValueError:
wfe_drift_off = None
wfe_drift_lxmap = out.get('wfe_drift_lxmap')
if return_results:
return wfe_drift, wfe_drift_off, wfe_drift_lxmap
else:
self._psf_coeff_mod['wfe_drift'] = wfe_drift
self._psf_coeff_mod['wfe_drift_off'] = wfe_drift_off
self._psf_coeff_mod['wfe_drift_lxmap'] = wfe_drift_lxmap
return
_log.warn('Generating WFE Drift coefficients. This may take some time...')
# Cycle through WFE drifts for fitting
wfe_list = np.array(wfe_list)
npos = len(wfe_list)
# Warn if mask shifting is currently enabled (only when an image mask is present)
for off_pos in ['coron_shift_x','coron_shift_y']:
val = self.options.get(off_pos)
if (self.image_mask is not None) and (val is not None) and (val != 0):
_log.warn(f'{off_pos} is set to {val:.3f} arcsec. Should this be 0?')
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
# Calculate residuals
cf_wfe = []
for wfe_drift in tqdm(wfe_list, leave=False, desc='WFE Drift'):
cf, _ = self.gen_psf_coeff(wfe_drift=wfe_drift, force=True, save=False, return_results=True, **kwargs)
cf_wfe.append(cf)
cf_wfe = np.asarray(cf_wfe)
# For coronagraphic observations, produce an off-axis PSF by turning off mask
cf_fit_off = cf_wfe_off = None
if self.is_coron:
image_mask_orig = self.image_mask
apername_orig = self._aperturename
self.image_mask = None
cf_wfe_off = []
for wfe_drift in tqdm(wfe_list, leave=False, desc="Off-Axis"):
cf, _ = self.gen_psf_coeff(wfe_drift=wfe_drift, force=True, save=False, return_results=True, **kwargs)
cf_wfe_off.append(cf)
cf_wfe_off = np.asarray(cf_wfe_off)
# Return to original values
self.image_mask = image_mask_orig
self.aperturename = apername_orig
# Return fov_pix to original size
self.fov_pix = fov_pix_orig
setup_logging(log_prev, verbose=False)
if return_raw:
return cf_wfe, cf_wfe_off, wfe_list
# Get residuals
cf_wfe_off = cf_wfe_off - cf_wfe_off[0]
# Fit each pixel with a polynomial and save the coefficient
cf_shape = cf_wfe_off.shape[1:]
cf_wfe_off = cf_wfe_off.reshape([npos, -1])
lxmap = np.array([np.min(wfe_list), np.max(wfe_list)])
cf_fit_off = jl_poly_fit(wfe_list, cf_wfe_off, deg=4, use_legendre=True, lxmap=lxmap)
cf_fit_off = cf_fit_off.reshape([-1, cf_shape[0], cf_shape[1], cf_shape[2]])
del cf_wfe_off
cf_wfe_off = None
else:
# Return fov_pix to original size
self.fov_pix = fov_pix_orig
setup_logging(log_prev, verbose=False)
if return_raw:
return cf_wfe, cf_wfe_off, wfe_list
# Get residuals
cf_wfe = cf_wfe - cf_wfe[0]
# Fit each pixel with a polynomial and save the coefficient
cf_shape = cf_wfe.shape[1:]
cf_wfe = cf_wfe.reshape([npos, -1])
lxmap = np.array([np.min(wfe_list), np.max(wfe_list)])
cf_fit = jl_poly_fit(wfe_list, cf_wfe, deg=4, use_legendre=True, lxmap=lxmap)
cf_fit = cf_fit.reshape([-1, cf_shape[0], cf_shape[1], cf_shape[2]])
if save:
_log.info(f"Saving to {outname}")
np.savez(outname, wfe_drift=cf_fit, wfe_drift_off=cf_fit_off, wfe_drift_lxmap=lxmap)
_log.info('Done.')
# Options to return results from function
if return_results:
return cf_fit, cf_fit_off, lxmap
else:
self._psf_coeff_mod['wfe_drift'] = cf_fit
self._psf_coeff_mod['wfe_drift_off'] = cf_fit_off
self._psf_coeff_mod['wfe_drift_lxmap'] = lxmap
def _gen_wfefield_coeff(self, force=False, save=True, return_results=False, return_raw=False, **kwargs):
""" Fit WFE field-dependent coefficients
Find a relationship between field position and PSF coefficients for
non-coronagraphic observations and when `include_si_wfe` is enabled.
Parameters
----------
force : bool
Forces a recalculation of coefficients even if saved file exists.
(default: False)
save : bool
Save the resulting PSF coefficients to a file? (default: True)
Keyword Args
------------
return_results : bool
By default, results are saved in `self._psf_coeff_mod` dictionary.
If return_results=True, results are instead returned as function outputs
and will not be saved to the dictionary attributes.
return_raw : bool
Normally, we return the relation between PSF coefficients as a function
of position. Instead this returns (as function outputs) the raw values
prior to fitting. Final results will not be saved to the dictionary attributes.
"""
if (self.include_si_wfe==False) or (self.is_coron):
_log.info("Skipping WFE field dependence...")
if self.include_si_wfe==False:
_log.info(" `include_si_wfe` attribute is set to False.")
if self.is_coron:
_log.info(f" {self.name} coronagraphic image mask is in place.")
del self._psf_coeff_mod['si_field'] # Delete potentially large array
self._psf_coeff_mod['si_field'] = None
self._psf_coeff_mod['si_field_v2grid'] = None
self._psf_coeff_mod['si_field_v3grid'] = None
self._psf_coeff_mod['si_field_apname'] = None
return
# Delete potentially large array
if (not return_raw) and (not return_results):
del self._psf_coeff_mod['si_field']
# fov_pix should not be more than some size to preserve memory
fov_max = self._fovmax_wfefield if self.oversample<=4 else self._fovmax_wfefield / 2
fov_pix_orig = self.fov_pix
if self.fov_pix>fov_max:
self.fov_pix = fov_max if (self.fov_pix % 2 == 0) else fov_max + 1
# Name to save array of oversampled coefficients
save_dir = self.save_dir
save_name = os.path.splitext(self.save_name)[0] + '_wfefields.npz'
outname = str(save_dir / save_name)
# Load file if it already exists
if (not force) and os.path.exists(outname):
# Return fov_pix to original size
self.fov_pix = fov_pix_orig
_log.info(f"Loading {outname}")
out = np.load(outname)
if return_results:
return out['arr_0'], out['arr_1'], out['arr_2'], out['arr_3']
else:
self._psf_coeff_mod['si_field'] = out['arr_0']
self._psf_coeff_mod['si_field_v2grid'] = out['arr_1']
self._psf_coeff_mod['si_field_v3grid'] = out['arr_2']
self._psf_coeff_mod['si_field_apname'] = out['arr_3'].flatten()[0]
return
_log.warn('Generating field-dependent coefficients. This may take some time...')
# Cycle through a list of field points
# These are the measured CV3 field positions
zfile = 'si_zernikes_isim_cv3.fits'
if self.name=='NIRCam':
channel = 'LW' if 'long' in self.channel else 'SW'
module = self.module
# Check if NIRCam Lyot wedges are in place
if self.is_lyot:
if module=='B':
raise NotImplementedError("There are no Full Frame SIAF apertures defined for Mod B coronagraphy")
# These are extracted from Zemax models
zfile = 'si_zernikes_coron_wfe.fits'
# Read in measured SI Zernike data
data_dir = self._WebbPSF_basepath
zernike_file = os.path.join(data_dir, zfile)
ztable_full = Table.read(zernike_file)
if self.name=="NIRCam":
inst_name = self.name + channel + module
else:
inst_name = self.name
ind_inst = [inst_name in row['instrument'] for row in ztable_full]
ind_inst = np.where(ind_inst)
# Grab measured V2 and V3 positions
v2_all = np.array(ztable_full[ind_inst]['V2'].tolist())
v3_all = np.array(ztable_full[ind_inst]['V3'].tolist())
# Add detector corners
# Want full detector footprint, not just subarray aperture
if self.name=='NIRCam':
pupil = self.pupil_mask
v2_min, v2_max, v3_min, v3_max = NIRCam_V2V3_limits(module, channel=channel, pupil=pupil, rederive=True, border=1)
igood = v3_all > v3_min
v2_all = np.append(v2_all[igood], [v2_min, v2_max, v2_min, v2_max])
v3_all = np.append(v3_all[igood], [v3_min, v3_min, v3_max, v3_max])
npos = len(v2_all)
else: # Other instrument detector fields are perhaps a little simpler
# Specify the full frame apertures for grabbing corners of FoV
if self.name=='MIRI':
ap = self.siaf['MIRIM_FULL']
else:
raise NotImplementedError("Field Variations not implemented for {}".format(self.name))
v2_ref, v3_ref = ap.corners('tel', False)
# Add border margin of 1"
v2_avg = np.mean(v2_ref)
v2_ref[v2_ref<v2_avg] -= 1
v2_ref[v2_ref>v2_avg] += 1
v3_avg = np.mean(v3_ref)
v3_ref[v3_ref<v3_avg] -= 1
v3_ref[v3_ref>v3_avg] += 1
# V2/V3 min and max convert to arcmin and add to v2_all/v3_all
v2_min, v2_max = np.array([v2_ref.min(), v2_ref.max()]) / 60.
v3_min, v3_max = np.array([v3_ref.min(), v3_ref.max()]) / 60.
v2_all = np.append(v2_all, [v2_min, v2_max, v2_min, v2_max])
v3_all = np.append(v3_all, [v3_min, v3_min, v3_max, v3_max])
npos = len(v2_all)
# Convert V2/V3 positions to sci coords for specified aperture
apname = self.aperturename
ap = self.siaf[apname]
xsci_all, ysci_all = ap.convert(v2_all*60, v3_all*60, 'tel', 'sci')
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
# Initial settings
coeff0 = self.psf_coeff
x0, y0 = self.detector_position
# Calculate new coefficients at each position
cf_fields = []
# Create progress bar object
pbar = tqdm(zip(xsci_all, ysci_all), total=npos, desc='Field Points')
for xsci, ysci in pbar:
# Update progress bar description
pbar.set_description(f"xsci, ysci = ({xsci:.0f}, {ysci:.0f})")
# Update saved detector position and calculate PSF coeff
self.detector_position = (xsci, ysci)
cf, _ = self.gen_psf_coeff(force=True, save=False, return_results=True, **kwargs)
cf_fields.append(cf)
cf_fields = np.asarray(cf_fields)
# Reset to initial values
self.detector_position = (x0,y0)
self.fov_pix = fov_pix_orig
setup_logging(log_prev, verbose=False)
# Return raw results for further analysis
if return_raw:
return cf_fields, v2_all, v3_all
# Get residuals
new_shape = cf_fields.shape[-2:]
if coeff0.shape[-2:] != new_shape:
coeff0_resize = np.asarray([pad_or_cut_to_size(im, new_shape) for im in coeff0])
coeff0 = coeff0_resize
cf_fields_resid = cf_fields - coeff0
del cf_fields
# Create an evenly spaced grid of V2/V3 coordinates
nv23 = 8
v2grid = np.linspace(v2_min, v2_max, num=nv23)
v3grid = np.linspace(v3_min, v3_max, num=nv23)
# Interpolate onto an evenly space grid
res = make_coeff_resid_grid(v2_all, v3_all, cf_fields_resid, v2grid, v3grid)
if save:
_log.info(f"Saving to {outname}")
np.savez(outname, res, v2grid, v3grid, apname)
if return_results:
return res, v2grid, v3grid, apname
else:
self._psf_coeff_mod['si_field'] = res
self._psf_coeff_mod['si_field_v2grid'] = v2grid
self._psf_coeff_mod['si_field_v3grid'] = v3grid
self._psf_coeff_mod['si_field_apname'] = apname
def _gen_wfemask_coeff(self, force=False, save=True, large_grid=None,
return_results=False, return_raw=False, **kwargs):
if (not self.is_coron):
_log.info("Skipping WFE mask dependence...")
_log.info(" Coronagraphic image mask not in place")
del self._psf_coeff_mod['si_mask'] # Delete potentially large array
self._psf_coeff_mod['si_mask'] = None
self._psf_coeff_mod['si_mask_xgrid'] = None
self._psf_coeff_mod['si_mask_ygrid'] = None
self._psf_coeff_mod['si_mask_apname'] = None
return
# Delete potentially large array
if (not return_raw) and (not return_results):
del self._psf_coeff_mod['si_mask']
large_grid = self._psf_coeff_mod['si_mask_large'] if large_grid is None else large_grid
# fov_pix should not be more than some size to preserve memory
fov_max = self._fovmax_wfemask if self.oversample<=4 else self._fovmax_wfemask / 2
fov_pix_orig = self.fov_pix
if self.fov_pix>fov_max:
self.fov_pix = fov_max if (self.fov_pix % 2 == 0) else fov_max + 1
# Modify bar_offset to always be 0 (center of wedge FOV)
# Do this here so that it is captured in the save name
bar_offset_orig = self.options.get('bar_offset', None)
if (self.name=='NIRCam'):
self.options['bar_offset'] = 0
# Name to save array of oversampled coefficients
save_dir = self.save_dir
file_ext = '_large_grid_wfemask.npz' if large_grid else '_wfemask.npz'
save_name = os.path.splitext(self.save_name)[0] + file_ext
outname = str(save_dir / save_name)
# Load file if it already exists
if (not force) and os.path.exists(outname):
# Return parameter to original
self.fov_pix = fov_pix_orig
if self.name=='NIRCam':
self.options['bar_offset'] = bar_offset_orig
_log.info(f"Loading {outname}")
out = np.load(outname)
if return_results:
return out['arr_0'], out['arr_1'], out['arr_2'], out['arr_3']
else:
self._psf_coeff_mod['si_mask'] = out['arr_0']
self._psf_coeff_mod['si_mask_xgrid'] = out['arr_1']
self._psf_coeff_mod['si_mask_ygrid'] = out['arr_2']
self._psf_coeff_mod['si_mask_apname'] = out['arr_3'].flatten()[0]
self._psf_coeff_mod['si_mask_large'] = large_grid
return
if large_grid:
_log.warn('Generating mask position-dependent coeffs (large grid). This may take some time...')
else:
_log.warn('Generating mask position-dependent coeffs (small grid). This may take some time...')
# Current mask positions to return to at end
coron_shift_x_orig = self.options.get('coron_shift_x', 0)
coron_shift_y_orig = self.options.get('coron_shift_y', 0)
detector_position_orig = self.detector_position
apname = self.aperturename
# Cycle through a list of field points
if self.name=='MIRI':
# Series of x and y mask shifts (in mask coordinates)
# Negative shifts will place source in upper right quadrant
# Depend on PSF symmetries for other three quadrants
if 'FQPM' in self.image_mask:
if large_grid:
xy_offsets = -1 * np.array([0, 0.005, 0.01, 0.08, 0.10, 0.2, 0.5, 1, 5, 11])
else:
xy_offsets = -1 * np.array([0, 0.01, 0.10, 1, 10])
xy_offsets[0] = 0
elif 'LYOT' in self.image_mask:
# TODO: Update offsets to optimize for Lyot mask
if large_grid:
xy_offsets = -1 * np.array([0, 0.01, 0.1, 0.36, 0.5, 1, 2.1, 5, 11])
else:
xy_offsets = -1 * np.array([0, 0.01, 0.10, 1, 10])
xy_offsets[0] = 0
else:
raise NotImplementedError(f'{self.name} with {self.image_mask} not implemented.')
x_offsets = y_offsets = np.sort(xy_offsets) # Ascending order
grid_vals = np.array(np.meshgrid(y_offsets,x_offsets))
xy_list = [(x,y) for x,y in grid_vals.reshape([2,-1]).transpose()]
xoff, yoff = np.array(xy_list).transpose()
# Small grid dithers indices
# Always calculate these explicitly
ind_zero = (np.abs(xoff)==0) & (np.abs(yoff)==0)
iwa = 0.01
ind_sgd = (np.abs(xoff)<=iwa) & (np.abs(yoff)<=iwa) & ~ind_zero
elif self.name=='NIRCam':
# Turn off ND square calculations
# Such PSFs will be attenuated later
nd_squares_orig = self.options.get('nd_squares', True)
self.options['nd_squares'] = False
# Build position offsets
if self.image_mask[-1]=='R': # Round masks
if large_grid:
# Include SGD points
xy_inner = np.array([0.015, 0.02, 0.05, 0.1])
# M430R sampling; scale others
# xy_mid = np.array([0.6, 1.2, 2, 2.5])
xy_mid = np.array([0.6, 1.2, 2.5])
if '210R' in self.image_mask:
xy_mid *= 0.488
elif '335R' in self.image_mask:
xy_mid *= 0.779
# xy_outer = np.array([5.0, 8.0])
xy_outer = np.array([8.0])
# Sort offsets [-], 0, [+]
xy_pos = np.concatenate((xy_inner, xy_mid, xy_outer))
xy_neg = -1 * xy_pos[::-1]
xy_offsets = np.concatenate((xy_neg, [0], xy_pos))
else:
# Assume symmetries for round mask
xy_offsets = np.array([-8, -1.5, -0.1, -0.01, 0])
# Create grid spacing
x_offsets = y_offsets = np.sort(xy_offsets)
grid_vals = np.array(np.meshgrid(x_offsets,y_offsets))
xy_list = [(x,y) for x,y in grid_vals.reshape([2,-1]).transpose()]
xoff, yoff = np.array(xy_list).transpose()
# Small grid dithers indices and close IWA
# Always calculate these explicitly
# Exclude x,y=(0,0) since we want to calc this early
ind_zero = (np.abs(xoff)==0) & (np.abs(yoff)==0)
iwa = 0.1
ind_sgd = (np.abs(xoff)<=iwa) & (np.abs(yoff)<=iwa) & ~ind_zero
else: # Bar masks
if large_grid:
# Include SGD points
y_inner = np.array([0.01, 0.02, 0.05, 0.1])
# LWB sampling of wedge gradient
y_mid = np.array([0.6, 1.2, 2, 2.5])
if 'SW' in self.image_mask:
y_mid *= 0.488
y_outer = np.array([5,8])
y_offsets = np.concatenate([y_inner, y_mid, y_outer])
x_offsets = np.array([-8, -6, -4, -2, 0, 2, 4, 6, 8], dtype='float')
# Mask offset values in ascending order
x_offsets = np.sort(-1*x_offsets)
y_offsets = np.concatenate([-1*y_offsets,[0],y_offsets])
y_offsets = np.sort(-1*y_offsets)
else:
y_offsets = np.array([-8, -1.5, -0.1, -0.01, 0])
x_offsets = np.array([-8, -5, -2, 0, 2, 5, 8], dtype='float')
grid_vals = np.array(np.meshgrid(x_offsets,y_offsets))
xy_list = [(x,y) for x,y in grid_vals.reshape([2,-1]).transpose()]
xoff, yoff = np.array(xy_list).transpose()
# Small grid dithers indices and close IWA
# Always calculate these explicitly
ind_zero = (np.abs(xoff)==0) & (np.abs(yoff)==0)
iwa = 0.1
ind_sgd = (np.abs(yoff)<=iwa) & ~ind_zero
# Get PSF coefficients for each specified position
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
npos = len(xoff)
fov_pix_over = self.fov_pix * self.oversample
cf_all = np.zeros([npos, self.ndeg+1, fov_pix_over, fov_pix_over], dtype='float')
# Create progress bar object
pbar = trange(npos, leave=False, desc="Mask Offsets")
for i in pbar:
xv, yv = (xoff[i], yoff[i])
# Update descriptive label
pbar.set_description(f"xoff, yoff = ({xv:.2f}, {yv:.2f})")
self.options['coron_shift_x'] = xv
self.options['coron_shift_y'] = yv
# Pixel offset information
field_rot = 0 if self._rotation is None else self._rotation
xyoff_pix = np.array(xy_rot(-1*xv, -1*yv, -1*field_rot)) / self.pixelscale
self.detector_position = np.array(detector_position_orig) + xyoff_pix
# Skip SGD locations until later
if ind_sgd[i]==False:
cf, _ = self.gen_psf_coeff(return_results=True, force=True, save=False, **kwargs)
cf_all[i] = cf
# Save central coefficient to it's own variable
if (xv==0) and (yv==0):
coeff0 = cf
setup_logging(log_prev, verbose=False)
# Return raw results for further analysis
# Excludes concatenation of symmetric PSFs and SGD calculations
if return_raw:
# Return to previous values
self.options['coron_shift_x'] = coron_shift_x_orig
self.options['coron_shift_y'] = coron_shift_y_orig
self.detector_position = detector_position_orig
self.fov_pix = fov_pix_orig
if self.name=='NIRCam':
self.options['nd_squares'] = nd_squares_orig
self.options['bar_offset'] = bar_offset_orig
return cf_all, xoff, yoff
# Get residuals
cf_all -= coeff0
cf_resid = cf_all
# cf_resid = cf_all - coeff0
# del cf_all
# Reshape into cf_resid into [nypos, nxpos, ncf, nypix, nxpix]
nxpos = len(x_offsets)
nypos = len(y_offsets)
xoff = xoff.reshape([nypos,nxpos])
yoff = yoff.reshape([nypos,nxpos])
sh = cf_resid.shape
cf_resid = cf_resid.reshape([nypos,nxpos,sh[1],sh[2],sh[3]])
ncf, nypix, nxpix = cf_resid.shape[-3:]
# MIRI quadrant symmetries
# This doesn't work for NIRCam, because of chromatic PSF shifts in y-direction
if self.name=='MIRI':
field_rot = 0 if self._rotation is None else 2*self._rotation
# Assuming that x=y=0 are in the final index (i=-1)
# Add same x, but -1*y
x_negy = xoff[:-1,:]
y_negy = -1*yoff[:-1,:][::-1,:]
cf_negy = cf_resid[:-1,:][::-1,:]
sh_ret = cf_negy.shape
# Flip the PSF coeff image in the y-axis and rotate
cf_negy = cf_negy[:,:,:,::-1,:].reshape([-1,nypix,nxpix])
cf_negy = rotate_offset(cf_negy, field_rot, reshape=False, order=2, mode='mirror')
cf_negy = cf_negy.reshape(sh_ret)
# Add same y, but -1*x
x_negx = -1*xoff[:,:-1][:,::-1]
y_negx = yoff[:,:-1]
cf_negx = cf_resid[:,:-1][:,::-1]
sh_ret = cf_negx.shape
# Flip the PSF coeff image in the x-axis and rotate
cf_negx = cf_negx[:,:,:,:,::-1].reshape([-1,nypix,nxpix])
cf_negx = rotate_offset(cf_negx, field_rot, reshape=False, order=2, mode='mirror')
cf_negx = cf_negx.reshape(sh_ret)
# Add -1*y, -1*x; exclude all x=0 and y=0 coords
x_negxy = -1*xoff[:-1,:-1][::-1,::-1]
y_negxy = -1*yoff[:-1,:-1][::-1,::-1]
cf_negxy = cf_resid[:-1,:-1][::-1,::-1]
# Flip the PSF coeff image in both x-axis and y-axis
# No rotation necessary
cf_negxy = cf_negxy[:,:,:,::-1,::-1]
# Combine quadrants
xoff1 = np.concatenate((xoff, x_negy), axis=0)
xoff2 = np.concatenate((x_negx, x_negxy), axis=0)
xoff_all = np.concatenate((xoff1, xoff2), axis=1)
yoff1 = np.concatenate((yoff, y_negy), axis=0)
yoff2 = np.concatenate((y_negx, y_negxy), axis=0)
yoff_all = np.concatenate((yoff1, yoff2), axis=1)
# Get rid of unnecessary and potentially large arrays
del xoff1, xoff2, yoff1, yoff2
cf1 = np.concatenate((cf_resid, cf_negy), axis=0)
del cf_resid, cf_negy
cf2 = np.concatenate((cf_negx, cf_negxy), axis=0)
del cf_negx, cf_negxy
cf_resid_all = np.concatenate((cf1, cf2), axis=1)
del cf1, cf2
# Get all SGD positions now that we've combined all x/y positions
# For SGD regions, we want to calculate actual PSFs, not take
# the shortcuts that were done above
ind_zero = (np.abs(xoff_all)==0) & (np.abs(yoff_all)==0)
ind_sgd = (np.abs(xoff_all)<=iwa) & (np.abs(yoff_all)<=iwa) & ~ind_zero
elif (self.name=='NIRCam') and (self.image_mask[-1]=='R') and (large_grid==True):
# Round Masks
xoff_all = xoff
yoff_all = yoff
cf_resid_all = cf_resid
# SGD positions
ind_zero = (np.abs(xoff_all)==0) & (np.abs(yoff_all)==0)
ind_sgd = (np.abs(xoff_all)<=iwa) & (np.abs(yoff_all)<=iwa) & ~ind_zero
elif (self.name=='NIRCam') and (self.image_mask[-1]=='B') and (large_grid==True):
# Bar Masks
xoff_all = xoff
yoff_all = yoff
cf_resid_all = cf_resid
# SGD positions
ind_zero = (np.abs(xoff_all)==0) & (np.abs(yoff_all)==0)
ind_sgd = (np.abs(yoff_all)<=iwa) & ~ind_zero
# Short cuts for quicker creation
elif (self.name=='NIRCam') and (self.image_mask[-1]=='R') and (large_grid==False):
# No need to rotate NIRcam, because self._rotation is None
field_rot = 0 if self._rotation is None else 2*self._rotation
# Assuming that x=y=0 are in the final index (i=-1)
# Add same x, but -1*y
x_negy = xoff[:-1,:]
y_negy = -1*yoff[:-1,:][::-1,:]
cf_negy = cf_resid[:-1,:][::-1,:]
# Don't Flip the PSF coeff image in the y-axis
# No rotation necessary
# Add same y, but -1*x
x_negx = -1*xoff[:,:-1][:,::-1]
y_negx = yoff[:,:-1]
cf_negx = cf_resid[:,:-1][:,::-1]
# Flip the PSF coeff image in the x-axis and rotate
cf_negx = cf_negx[:,:,:,:,::-1]
if np.abs(field_rot) < 0.01:
sh_ret = cf_negx.shape
cf_negx = cf_negx.reshape([-1,nypix,nxpix])
cf_negx = rotate_offset(cf_negx, field_rot, reshape=False, order=2, mode='mirror')
cf_negx = cf_negx.reshape(sh_ret)
# Add -1*y, -1*x; exclude all x=0 and y=0 coords
x_negxy = -1*xoff[:-1,:-1][::-1,::-1]
y_negxy = -1*yoff[:-1,:-1][::-1,::-1]
cf_negxy = cf_resid[:-1,:-1][::-1,::-1]
# Flip the PSF coeff image only along x-axis
# Rotate if necessary
cf_negxy = cf_negxy[:,:,:,:,::-1]
if np.abs(field_rot) < 0.01:
sh_ret = cf_negxy.shape
cf_negxy = cf_negxy.reshape([-1,nypix,nxpix])
cf_negxy = rotate_offset(cf_negxy, field_rot, reshape=False, order=2, mode='mirror')
cf_negxy = cf_negxy.reshape(sh_ret)
# Combine quadrants
xoff1 = np.concatenate((xoff, x_negy), axis=0)
xoff2 = np.concatenate((x_negx, x_negxy), axis=0)
xoff_all = np.concatenate((xoff1, xoff2), axis=1)
yoff1 = np.concatenate((yoff, y_negy), axis=0)
yoff2 = np.concatenate((y_negx, y_negxy), axis=0)
yoff_all = np.concatenate((yoff1, yoff2), axis=1)
# Get rid of unnecessary and potentially large arrays
del xoff1, xoff2, yoff1, yoff2
cf1 = np.concatenate((cf_resid, cf_negy), axis=0)
del cf_resid, cf_negy
cf2 = np.concatenate((cf_negx, cf_negxy), axis=0)
del cf_negx, cf_negxy
cf_resid_all = np.concatenate((cf1, cf2), axis=1)
del cf1, cf2
# Get all SGD positions now that we've combined all x/y positions
# For SGD regions, we want to calculate actual PSFs, not take
# the shortcuts that were done above
ind_zero = (np.abs(xoff_all)==0) & (np.abs(yoff_all)==0)
ind_sgd = (np.abs(xoff_all)<=iwa) & (np.abs(yoff_all)<=iwa) & ~ind_zero
# Short cuts for quicker creation
elif (self.name=='NIRCam') and (self.image_mask[-1]=='B') and (large_grid==False):
# Bar masks
# No need to rotate NIRcam, because self._rotation is None
field_rot = 0 if self._rotation is None else 2*self._rotation
# Assuming that y=0 are in the final index (i=-1)
# Add same x, but -1*y
x_negy = xoff[:-1,:]
y_negy = -1*yoff[:-1,:][::-1,:]
cf_negy = cf_resid[:-1,:][::-1,:]
# Flip the PSF coeff image in the y-axis
cf_negy = cf_negy[:,:,:,::-1,:]
# Rotation
if np.abs(field_rot) < 0.01:
sh_ret = cf_negy.shape
cf_negy = cf_negy.reshape([-1,nypix,nxpix])
cf_negy = rotate_offset(cf_negy, field_rot, reshape=False, order=2, mode='mirror')
cf_negy = cf_negy.reshape(sh_ret)
# Combine halves
xoff_all = np.concatenate((xoff, x_negy), axis=0)
yoff_all = np.concatenate((yoff, y_negy), axis=0)
cf_resid_all = np.concatenate((cf_resid, cf_negy), axis=0)
# Get rid of unnecessary and potentially large arrays
del xoff, x_negy, yoff, y_negy, cf_resid, cf_negy
# Get all SGD positions now that we've combined all x/y positions
# For SGD regions, we want to calculate actual PSFs, not take
# the shortcuts that were done above
ind_zero = (np.abs(xoff_all)==0) & (np.abs(yoff_all)==0)
ind_sgd = (np.abs(yoff_all)<=iwa) & ~ind_zero
else:
msg = f'{self.name} not implemented for different WFE mask modifications.'
raise NotImplementedError(msg)
# Get PSF coefficients for each SGD position
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
# Set to fov calculation size
xsgd, ysgd = (xoff_all[ind_sgd], yoff_all[ind_sgd])
nsgd = len(xsgd)
if nsgd>0:
# Create progress bar object
pbar = tqdm(zip(xsgd, ysgd), total=nsgd, desc='SGD', leave=False)
for xv, yv in pbar:
# Update descriptive label
pbar.set_description(f"xsgd, ysgd = ({xv:.2f}, {yv:.2f})")
self.options['coron_shift_x'] = xv
self.options['coron_shift_y'] = yv
# Pixel offset information
field_rot = 0 if self._rotation is None else self._rotation
xyoff_pix = np.array(xy_rot(-1*xv, -1*yv, -field_rot)) / self.pixelscale
self.detector_position = np.array(detector_position_orig) + xyoff_pix
cf, _ = self.gen_psf_coeff(return_results=True, force=True, save=False, **kwargs)
ind = (xoff_all==xv) & (yoff_all==yv)
cf_resid_all[ind] = cf - coeff0
setup_logging(log_prev, verbose=False)
# Return to previous values
self.options['coron_shift_x'] = coron_shift_x_orig
self.options['coron_shift_y'] = coron_shift_y_orig
self.detector_position = detector_position_orig
self.fov_pix = fov_pix_orig
if self.name=='NIRCam':
self.options['nd_squares'] = nd_squares_orig
self.options['bar_offset'] = bar_offset_orig
# x and y grid values to return
xvals = xoff_all[0]
yvals = yoff_all[:,0]
# Use field_coeff_func with x and y shift values
# xvals_new = np.array([-1.2, 4.0, 0.1, -3])
# yvals_new = np.array([ 3.0, 2.3, -0.1, 0])
# test = field_coeff_func(xvals, yvals, cf_resid_all, xvals_new, yvals_new)
if save:
_log.info(f"Saving to {outname}")
np.savez(outname, cf_resid_all, xvals, yvals, apname)
if return_results:
return cf_resid_all, xvals, yvals
else:
self._psf_coeff_mod['si_mask'] = cf_resid_all
self._psf_coeff_mod['si_mask_xgrid'] = xvals
self._psf_coeff_mod['si_mask_ygrid'] = yvals
self._psf_coeff_mod['si_mask_apname'] = apname
self._psf_coeff_mod['si_mask_large'] = large_grid
def _calc_psf_from_coeff(self, sp=None, return_oversample=True, return_hdul=True,
wfe_drift=None, coord_vals=None, coord_frame='tel', break_iter=True,
siaf_ap=None, **kwargs):
"""PSF Image from polynomial coefficients
Create a PSF image from instrument settings. The image is noiseless and
doesn't take into account any non-linearity or saturation effects, but is
convolved with the instrument throughput. Pixel values are in counts/sec.
The result is effectively an idealized slope image (no background).
If no spectral dispersers (grisms or DHS), then this returns a single
image or list of images if sp is a list of spectra. By default, it returns
only the oversampled PSF, but setting return_oversample=False will
instead return a set of detector-sampled images.
Parameters
----------
sp : :mod:`pysynphot.spectrum`
If not specified, the default is flat in phot lam (equal number of photons
per wavelength bin). The default is normalized to produce 1 count/sec within
that bandpass, assuming the telescope collecting area and instrument bandpass.
Coronagraphic PSFs will further decrease this due to the smaller pupil
size and suppression of coronagraphic mask.
If set, then the resulting PSF image will be scaled to generate the total
observed number of photons from the spectrum.
return_oversample : bool
If True, then also returns the oversampled version of the PSF (default: True).
wfe_drift : float or None
Wavefront error drift amplitude in nm.
coord_vals : tuple or None
Coordinates (in arcsec or pixels) to calculate field-dependent PSF.
If multiple values, then this should be an array ([xvals], [yvals]).
Relative to self.aperturename aperture.
coord_frame : str
Type of input coordinates relative to self.aperturename aperture.
* 'tel': arcsecs V2,V3
* 'sci': pixels, in conventional DMS axes orientation
* 'det': pixels, in raw detector read out axes orientation
* 'idl': arcsecs relative to aperture reference location.
return_hdul : bool
Return PSFs in an HDUList rather than set of arrays (default: True).
break_iter : bool
For multiple field points, break up and generate PSFs one-by-one rather
than simultaneously, which can save on memory for large PSFs.
"""
psf_coeff_hdr = self.psf_coeff_header
psf_coeff = self.psf_coeff
if psf_coeff is None:
_log.warning("You must first run `gen_psf_coeff` in order to calculate PSFs.")
return
# Spectrographic Mode?
is_spec = False
if (self.name=='NIRCam') or (self.name=='NIRISS'):
is_spec = True if self.is_grism else False
elif (self.name=='MIRI') or (self.name=='NIRSpec'):
is_spec = True if self.is_slitspec else False
# Make sp a list of spectral objects if it already isn't
if sp is None:
nspec = 0
elif (sp is not None) and (not isinstance(sp, list)):
sp = [sp]
nspec = 1
else:
nspec = len(sp)
if coord_vals is not None:
coord_vals = np.array(coord_vals)
# If large number of requested field points, then break into single event calls
if coord_vals is not None:
c1_all, c2_all = coord_vals
nfield_init = np.size(c1_all)
break_iter = False if nfield_init<=1 else break_iter
if break_iter:
kwargs['sp'] = sp
kwargs['return_oversample'] = return_oversample
kwargs['return_hdul'] = return_hdul
kwargs['wfe_drift'] = wfe_drift
kwargs['coord_frame'] = coord_frame
kwargs['siaf_ap'] = siaf_ap
psf_all = fits.HDUList() if return_hdul else []
for ii in trange(nfield_init, leave=False, desc='PSFs'):
kwargs['coord_vals'] = (c1_all[ii], c2_all[ii])
# Just a single spectrum? Or unique spectrum at each field point?
kwargs['sp'] = sp[ii] if((sp is not None) and (nspec==nfield_init)) else sp
res = _calc_psf_from_coeff(self, **kwargs)
if return_hdul:
# For grisms (etc), the wavelength solution is the same for each field point
psf = res[0]
wave = res[1] if is_spec else None
# if ii>0:
# psf = fits.ImageHDU(data=psf.data, header=psf.header)
else:
# For grisms (etc), the wavelength solution is the same for each field point
wave, psf = res if is_spec else (None, res)
psf_all.append(psf)
if return_hdul:
output = fits.HDUList(psf_all)
if is_spec:
output.append(wave)
else:
output = np.asarray(psf_all)
output = (wave, psf_all) if is_spec else output
return output
# Coeff modification variable
psf_coeff_mod = 0
if wfe_drift is None:
wfe_drift = 0
# Modify PSF coefficients based on field-dependence
# Ignore if there is a focal plane mask
# No need for SI WFE field dependence if coronagraphy, but this allows us
# to enable `include_si_wfe` for NIRCam PSF calculation
nfield = None
if self.image_mask is None:
cf_mod, nfield = _coeff_mod_wfe_field(self, coord_vals, coord_frame,
siaf_ap=siaf_ap)
psf_coeff_mod += cf_mod
nfield = 1 if nfield is None else nfield
# Modify PSF coefficients based on field-dependence with a focal plane mask
nfield_mask = None
if self.image_mask is not None:
cf_mod, nfield_mask = _coeff_mod_wfe_mask(self, coord_vals, coord_frame,
siaf_ap=siaf_ap)
psf_coeff_mod += cf_mod
nfield = nfield if nfield_mask is None else nfield_mask
# Modify PSF coefficients based on WFE drift
# TODO: Allow negative WFE drift, but subtract delta WFE?
assert wfe_drift>=0, '`wfe_drift` should not be negative'
wfe_drift_keys = _wfe_drift_key(self, coord_vals, coord_frame)
ukeys = np.unique(wfe_drift_keys)
nkeys = len(ukeys)
if nkeys==1:
psf_coeff_mod += _coeff_mod_wfe_drift(self, wfe_drift, key=ukeys[0])
else:
# Create coefficients for each unique key
cf_mod_list = []
for key in ukeys:
cf_mod = _coeff_mod_wfe_drift(self, wfe_drift, key=key)
cf_mod_list.append(cf_mod)
# At each field position, find the corresponding coeff modification
for i in range(nfield):
ind = np.where(ukeys==wfe_drift_keys[i])[0][0]
cf_mod = cf_mod_list[ind]
psf_coeff_mod[i] += cf_mod
# return psf_coeff, psf_coeff_mod
# Add modifications to coefficients
psf_coeff_mod += psf_coeff
del psf_coeff
psf_coeff = psf_coeff_mod
# if multiple field points were present, we want to return PSF for each location
if nfield>1:
psf_all = []
for ii in trange(nfield, leave=True, desc='PSFs'):
# Just a single spectrum? Or unique spectrum at each field point?
sp_norm = sp[ii] if((sp is not None) and (nspec==nfield)) else sp
# Delete coefficients as they are used to reduce memory usage
cf_ii = psf_coeff[ii]
res = gen_image_from_coeff(self, cf_ii, psf_coeff_hdr, sp_norm=sp_norm,
return_oversample=return_oversample)
# For grisms (etc), the wavelength solution is the same for each field point
wave, psf = res if is_spec else (None, res)
psf_all.append(psf)
if return_hdul:
xvals, yvals = coord_vals
hdul = fits.HDUList()
for ii, psf in enumerate(psf_all):
hdr = psf_coeff_hdr.copy()
if return_oversample:
hdr['EXTNAME'] = ('OVERDIST', 'This extension is oversampled')
hdr['OSAMP'] = (self.oversample, 'Image oversampling rel to det')
else:
hdr['EXTNAME'] = ('DET_DIST', 'This extension is detector sampled')
hdr['OSAMP'] = (1, 'Image oversampling rel to det')
hdr['PIXELSCL'] = self.pixelscale
cunits = 'pixels' if ('sci' in coord_frame) or ('det' in coord_frame) else 'arcsec'
hdr['XVAL'] = (xvals[ii], f'[{cunits}] Input X coordinate')
hdr['YVAL'] = (yvals[ii], f'[{cunits}] Input Y coordinate')
hdr['CFRAME'] = (coord_frame, 'Specified coordinate frame')
hdr['WFEDRIFT'] = (wfe_drift, '[nm] WFE drift amplitude')
hdul.append(fits.ImageHDU(data=psf, header=hdr))
# Append wavelength solution
if wave is not None:
hdul.append(fits.ImageHDU(data=wave, name='Wavelengths'))
output = hdul
else:
psf_all = np.asarray(psf_all)
output = (wave, psf_all) if is_spec else psf_all
else:
res = gen_image_from_coeff(self, psf_coeff, psf_coeff_hdr, sp_norm=sp,
return_oversample=return_oversample)
if return_hdul:
# For grisms (etc), the wavelength solution is the same for each field point
wave, psf = res if is_spec else (None, res)
hdr = psf_coeff_hdr.copy()
if return_oversample:
hdr['EXTNAME'] = ('OVERDIST', 'This extension is oversampled')
hdr['OSAMP'] = (self.oversample, 'Image oversampling rel to det')
else:
hdr['EXTNAME'] = ('DET_DIST', 'This extension is detector sampled')
hdr['OSAMP'] = (1, 'Image oversampling rel to det')
hdr['PIXELSCL'] = self.pixelscale
if coord_vals is not None:
cunits = 'pixels' if ('sci' in coord_frame) or ('det' in coord_frame) else 'arcsec'
hdr['XVAL'] = (coord_vals[0], f'[{cunits}] Input X coordinate')
hdr['YVAL'] = (coord_vals[1], f'[{cunits}] Input Y coordinate')
hdr['CFRAME'] = (coord_frame, 'Specified coordinate frame')
else:
cunits = 'pixels'
hdr['XVAL'] = (hdr['DET_X'], f'[{cunits}] Input X coordinate')
hdr['YVAL'] = (hdr['DET_Y'], f'[{cunits}] Input Y coordinate')
hdr['CFRAME'] = ('sci', 'Specified coordinate frame')
hdr['WFEDRIFT'] = (wfe_drift, '[nm] WFE drift amplitude')
hdul = fits.HDUList()
# Append each spectrum
if nspec<=1:
hdul.append(fits.ImageHDU(data=psf, header=hdr))
else:
for ii in range(nspec):
hdul.append(fits.ImageHDU(data=psf[ii], header=hdr))
# Append wavelength solution
if wave is not None:
hdul.append(fits.ImageHDU(data=wave, name='Wavelengths'))
output = hdul
else:
output = res
return output
def _wfe_drift_key(self, coord_vals, coord_frame):
"""
Choose which WFE drift coefficient key to use based on coordinate position.
Only for coronagraphic observations (on-axis vs off-axis PSFs).
"""
xsci = ysci = None
# Preceed only if coronagraphic observation
if not self.is_coron:
return 'wfe_drift'
# Modify PSF coefficients based on position
if coord_vals is None:
return 'wfe_drift'
else:
# Determine sci coordinates
cframe = coord_frame.lower()
if cframe=='sci':
xsci, ysci = coord_vals # pixels
elif cframe in ['det', 'tel', 'idl']:
x = np.array(coord_vals[0])
y = np.array(coord_vals[1])
try:
apname = self.aperturename
siaf_ap = self.siaf[apname]
xsci, ysci = siaf_ap.convert(x,y, cframe, 'sci')
except:
# apname = self.get_siaf_apname()
self._update_aperturename()
apname = self.aperturename
if apname is None:
_log.warning('No suitable aperture name defined to determine (xsci,ysci) coordiantes')
else:
_log.warning('self.siaf_ap not defined; assuming {}'.format(apname))
siaf_ap = self.siaf[apname]
xsci, ysci = siaf_ap.convert(x,y, cframe, 'sci') # arcsec
_log.warning('Update self.siaf_ap for more specific conversions to (xsci,ysci).')
else:
# Can't figure out coordinate frames, so assume 0
return 'wfe_drift'
# Assume we successfully found (xsci,ysci)
if (xsci is not None):
nfield = np.size(xsci)
field_rot = 0 if self._rotation is None else self._rotation
apname = self.aperturename
siaf_ap = self.siaf[apname]
# Convert to mask shifts (arcsec)
xoff_asec = (xsci - siaf_ap.XSciRef) * siaf_ap.XSciScale
yoff_asec = (ysci - siaf_ap.YSciRef) * siaf_ap.YSciScale
xoff, yoff = xy_rot(-1*xoff_asec, -1*yoff_asec, field_rot)
roff = np.sqrt(xoff**2 + yoff**2)
# For bar masks, evaluate based on vertical distance
if self.is_coron and self.image_mask[-1]=='B':
roff = np.abs(yoff)
if nfield==1:
roff = [roff]
# Choose either wfe_drift or wfe_drift_off for each position
res = []
for r in roff:
key = 'wfe_drift' if r<0.1 else 'wfe_drift_off'
res.append(key)
return res
else:
return 'wfe_drift'
def _coeff_mod_wfe_drift(self, wfe_drift, key='wfe_drift'):
"""
Modify PSF polynomial coefficients as a function of WFE drift.
"""
# Modify PSF coefficients based on WFE drift
if wfe_drift==0:
cf_mod = 0 # Don't modify coefficients
elif (self._psf_coeff_mod[key] is None):
_log.warning("You must run `gen_wfedrift_coeff` first before setting the wfe_drift parameter.")
_log.warning("Will continue assuming `wfe_drift=0`.")
cf_mod = 0
else:
_log.info("Generating WFE drift modifications...")
psf_coeff = self.psf_coeff
cf_fit = self._psf_coeff_mod[key]
lxmap = self._psf_coeff_mod['wfe_drift_lxmap']
# Fit function
cf_fit_shape = cf_fit.shape
cf_fit = cf_fit.reshape([cf_fit.shape[0], -1])
cf_mod = jl_poly(np.array([wfe_drift]), cf_fit, use_legendre=True, lxmap=lxmap)
cf_mod = cf_mod.reshape(cf_fit_shape[1:])
# Pad cf_mod array with 0s if undersized
if not np.allclose(psf_coeff.shape, cf_mod.shape):
new_shape = psf_coeff.shape[1:]
cf_mod_resize = np.asarray([pad_or_cut_to_size(im, new_shape) for im in cf_mod])
cf_mod = cf_mod_resize
return cf_mod
def _coeff_mod_wfe_field(self, coord_vals, coord_frame, siaf_ap=None):
"""
Modify PSF polynomial coefficients as a function of V2/V3 position.
"""
v2 = v3 = None
cf_mod = 0
nfield = None
psf_coeff_hdr = self.psf_coeff_header
psf_coeff = self.psf_coeff
cf_fit = self._psf_coeff_mod['si_field']
v2grid = self._psf_coeff_mod['si_field_v2grid']
v3grid = self._psf_coeff_mod['si_field_v3grid']
apname = self.aperturename # self._psf_coeff_mod['si_field_apname']
siaf_ap = self.siaf[apname] if apname is not None else None
# Modify PSF coefficients based on position
if coord_vals is None:
pass
elif self._psf_coeff_mod['si_field'] is None:
si_wfe_str = 'True' if self.include_si_wfe else 'False'
_log.info(f"Skipping WFE field dependence: self._psf_coeff_mod['si_field']=None and self.include_si_wfe={si_wfe_str}")
# _log.warning("You must run `gen_wfefield_coeff` first before setting the coord_vals parameter.")
# _log.warning("`calc_psf_from_coeff` will continue with default PSF.")
cf_mod = 0
else:
si_field_apname = self._psf_coeff_mod.get('si_field_apname')
siaf_ap_field = self.siaf[si_field_apname]
# Assume cframe corresponds to siaf_ap input
siaf_ap = siaf_ap_field if siaf_ap is None else siaf_ap
cframe = coord_frame.lower()
# Determine V2/V3 coordinates
# Convert to common 'tel' coordinates
if (siaf_ap.AperName != siaf_ap_field.AperName):
x = np.array(coord_vals[0])
y = np.array(coord_vals[1])
v2, v3 = siaf_ap.convert(x,y, cframe, 'tel')
v2, v3 = (v2/60., v3/60.) # convert to arcmin
elif cframe=='tel':
v2, v3 = coord_vals
v2, v3 = (v2/60., v3/60.) # convert to arcmin
elif cframe in ['det', 'sci', 'idl']:
x = np.array(coord_vals[0])
y = np.array(coord_vals[1])
v2, v3 = siaf_ap.convert(x,y, cframe, 'tel')
v2, v3 = (v2/60., v3/60.) # convert to arcmin
else:
_log.warning("coord_frame setting '{}' not recognized.".format(coord_frame))
_log.warning("`calc_psf_from_coeff` will continue with default PSF.")
# PSF Modifications assuming we successfully found v2/v3
if (v2 is not None):
_log.info("Generating field-dependent modifications...")
# print(v2,v3)
nfield = np.size(v2)
cf_mod = field_coeff_func(v2grid, v3grid, cf_fit, v2, v3)
cf_shape = psf_coeff.shape
# cf_mod = np.zeros([nfield, cf_shape[0], cf_shape[1], cf_shape[2]])
# Pad cf_mod array with 0s if undersized
psf_cf_dim = len(cf_shape)
if not np.allclose(cf_shape, cf_mod.shape[-psf_cf_dim:]):
new_shape = cf_shape[1:]
cf_mod_resize = np.asarray([pad_or_cut_to_size(im, new_shape) for im in cf_mod])
del cf_mod
cf_mod = cf_mod_resize
return cf_mod, nfield
def _coeff_mod_wfe_mask(self, coord_vals, coord_frame, siaf_ap=None):
"""
Modify PSF polynomial coefficients as a function of V2/V3 position.
Parameters
----------
coord_vals : tuple or None
Coordinates (in arcsec or pixels) to calculate field-dependent PSF.
coord_frame : str
Type of desired output coordinates.
* 'tel': arcsecs V2,V3
* 'sci': pixels, in conventional DMS axes orientation
* 'det': pixels, in raw detector read out axes orientation
* 'idl': arcsecs relative to aperture reference location.
"""
xidl = yidl = None
cf_mod = 0
nfield = None
psf_coeff_hdr = self.psf_coeff_header
psf_coeff = self.psf_coeff
cf_fit = self._psf_coeff_mod.get('si_mask', None)
# Coord values are set, but not coefficients supplied
if (coord_vals is not None) and (cf_fit is None):
_log.warning("You must run `gen_wfemask_coeff` first before setting the coord_vals parameter for masked focal planes.")
_log.info("`calc_psf_from_coeff` will continue without mask field dependency.")
# No coord values, but NIRCam bar/wedge mask in place
elif (coord_vals is None) and (self.name=='NIRCam') and (self.image_mask[-1]=='B'):
# Determine desired location along bar
bar_offset = self.get_bar_offset()
if (bar_offset != 0) and (cf_fit is None):
_log.warning("You must run `gen_wfemask_coeff` to obtain PSFs offset along bar mask.")
_log.info("`calc_psf_from_coeff` will continue assuming bar_offset=0.")
else:
nfield = 1
# Get coords in arcsec
xidl = bar_offset
yidl = 0
# # Get sci coords in pixels
# bar_offset_pix = bar_offset / siaf_ap.XSciScale
# xsci = siaf_ap.XSciRef + bar_offset_pix
# ysci = siaf_ap.YSciRef
elif (coord_vals is not None):
si_mask_apname = self._psf_coeff_mod.get('si_mask_apname')
siaf_ap_mask = self.siaf[si_mask_apname]
# Assume cframe corresponds to siaf_ap input
siaf_ap = siaf_ap_mask if siaf_ap is None else siaf_ap
cframe = coord_frame.lower()
# Convert to common 'tel' coordinates
if (siaf_ap.AperName != siaf_ap_mask.AperName):
x = np.array(coord_vals[0])
y = np.array(coord_vals[1])
xtel, ytel = siaf_ap.convert(x,y, cframe, 'tel')
xidl, yidl = siaf_ap_mask.convert(xtel, ytel, 'tel', 'idl')
elif cframe=='idl':
xidl, yidl = coord_vals
elif cframe in ['det', 'tel', 'sci']:
x = np.array(coord_vals[0])
y = np.array(coord_vals[1])
xidl, yidl = siaf_ap.convert(x,y, cframe, 'idl')
else:
_log.warning("coord_frame setting '{}' not recognized.".format(coord_frame))
_log.warning("`calc_psf_from_coeff` will continue with default PSF.")
# PSF Modifications assuming we successfully found (xsci,ysci)
# print(xidl, yidl)
if (xidl is not None):
_log.info("Generating mask-dependent modifications...")
# print(v2,v3)
nfield = np.size(xidl)
field_rot = 0 if self._rotation is None else self._rotation
# Convert to mask shifts (arcsec)
xoff_asec, yoff_asec = (xidl, yidl)
xoff_cf, yoff_cf = xy_rot(-1*xoff_asec, -1*yoff_asec, field_rot)
if (self.name=='NIRCam') and (np.any(np.abs(xoff_asec)>12) or np.any(np.abs(yoff_asec)>12)):
_log.warn("Some values outside mask FoV (beyond 12 asec offset)!")
# print(xoff_asec, yoff_asec)
xgrid = self._psf_coeff_mod['si_mask_xgrid'] # arcsec
ygrid = self._psf_coeff_mod['si_mask_ygrid'] # arcsec
cf_mod = field_coeff_func(xgrid, ygrid, cf_fit, xoff_cf, yoff_cf)
# Pad cf_mod array with 0s if undersized
psf_cf_dim = len(psf_coeff.shape)
if not np.allclose(psf_coeff.shape, cf_mod.shape[-psf_cf_dim:]):
new_shape = psf_coeff.shape[1:]
cf_mod_resize = np.asarray([pad_or_cut_to_size(im, new_shape) for im in cf_mod])
cf_mod = cf_mod_resize
return cf_mod, nfield
[docs]def coron_grid(self, npsf_per_axis, xoff_vals=None, yoff_vals=None):
"""Get grid points based on coronagraphic obseervation
Returns sci pixels values around mask center.
"""
def log_grid(nvals, vmax=10):
"""Log spacing in arcsec relative to mask center"""
# vals_p = np.logspace(-2,np.log10(vmax),int((nvals-1)/2))
vals_p = np.geomspace(0.01, vmax, int((nvals-1)/2))
vals_m = np.sort(-1*vals_p)
return np.sort(np.concatenate([vals_m, [0], vals_p]))
def lin_grid(nvals, vmin=-10, vmax=10):
"""Linear spacing in arcsec relative to mask center"""
return np.linspace(vmin, vmax, nvals)
# Observation aperture
siaf_ap = self.siaf[self.aperturename]
if self.name.lower()=='nircam':
nx_pix = 300 if self.channel.lower()=='long' else 600
ny_pix = nx_pix
else:
xvert, yvert = siaf_ap.corners('sci', rederive=False)
xsci_min, xsci_max = int(np.min(xvert)), int(np.max(xvert))
ysci_min, ysci_max = int(np.min(yvert)), int(np.max(yvert))
nx_pix = int(xsci_max - xsci_min)
ny_pix = int(ysci_max - ysci_min)
xoff_min, xoff_max = self.pixelscale * np.array([-1,1]) * nx_pix / 2
yoff_min, yoff_max = self.pixelscale * np.array([-1,1]) * ny_pix / 2
if np.size(npsf_per_axis)==1:
xpsf = ypsf = npsf_per_axis
else:
xpsf, ypsf = npsf_per_axis
field_rot = 0 if self._rotation is None else self._rotation
xlog_spacing = False if self.image_mask[-2:]=='WB' else True
ylog_spacing = True
if xoff_vals is None:
xmax = np.abs([xoff_min,xoff_max]).max()
xoff = log_grid(xpsf, xmax) if xlog_spacing else lin_grid(xpsf, -xmax, xmax)
else:
xoff = xoff_vals
if yoff_vals is None:
ymax = np.abs([yoff_min,yoff_max]).max()
yoff = log_grid(ypsf, ymax) if ylog_spacing else lin_grid(ypsf, -ymax, ymax)
else:
yoff = yoff_vals
# Mask Offset grid positions in arcsec
xgrid_off, ygrid_off = np.meshgrid(xoff, yoff)
xgrid_off, ygrid_off = xgrid_off.flatten(), ygrid_off.flatten()
# Offsets relative to center of mask
xoff_asec, yoff_asec = xy_rot(-1*xgrid_off, -1*ygrid_off, -1*field_rot)
xtel, ytel = siaf_ap.convert(xoff_asec, yoff_asec, 'idl', 'tel')
# Convert from aperture used to create mask into sci pixels for observe aperture
xsci, ysci = self.siaf_ap.convert(xtel, ytel, 'tel', 'sci')
return xsci, ysci
def _calc_psfs_grid(self, sp=None, wfe_drift=0, osamp=1, npsf_per_full_fov=15,
xsci_vals=None, ysci_vals=None, return_coords=None,
use_coeff=True, **kwargs):
"""Create a grid of PSFs across an instrumnet FoV
Create a grid of PSFs across instrument aperture FoV. By default,
imaging observations will be for full detector FoV with regularly
spaced grid. Coronagraphic observations will cover nominal
coronagraphic mask region (usually 10s of arcsec) and will have
logarithmically spaced values.
Keyword Args
============
sp : :mod:`pysynphot.spectrum`
If not specified, the default is flat in phot lam (equal number of photons
per wavelength bin). The default is normalized to produce 1 count/sec within
that bandpass, assuming the telescope collecting area and instrument bandpass.
Coronagraphic PSFs will further decrease this due to the smaller pupil
size and suppression of coronagraphic mask.
If set, then the resulting PSF image will be scaled to generate the total
observed number of photons from the spectrum (ie., not scaled by unit response).
wfe_drift : float
Desired WFE drift value relative to default OPD.
osamp : int
Sampling of output PSF relative to detector sampling.
npsf_per_full_fov : int
Number of PSFs across one dimension of the instrument's field of
view. If a coronagraphic observation, then this is for the nominal
coronagrahic field of view.
xsci_vals: None or ndarray
Option to pass a custom grid values along x-axis in 'sci' coords.
If coronagraph, this instead corresponds to coronagraphic mask axis in arcsec,
which has a slight rotation relative to detector axis in MIRI.
ysci_vals: None or ndarray
Option to pass a custom grid values along y-axis in 'sci' coords.
If coronagraph, this instead corresponds to coronagraphic mask axis in arcsec,
which has a slight rotation relative to detector axis in MIRI.
return_coords : None or str
Option to also return coordinate values in desired frame
('det', 'sci', 'tel', 'idl').
Output is then xvals, yvals, hdul_psfs.
use_coeff : bool
If True, uses `calc_psf_from_coeff`, other WebbPSF's built-in `calc_psf`.
"""
# Observation aperture
siaf_ap_obs = self.siaf_ap
# Produce grid of PSF locations across the field of view
if self.is_coron:
xsci_psf, ysci_psf = coron_grid(self, npsf_per_full_fov,
xoff_vals=xsci_vals, yoff_vals=ysci_vals)
else:
# No need to go beyond detector pixels
# Number of sci pixels in FoV
# Generate grid borders
xvert, yvert = siaf_ap_obs.closed_polygon_points('sci', rederive=False)
xsci_min, xsci_max = int(np.min(xvert)), int(np.max(xvert))
ysci_min, ysci_max = int(np.min(yvert)), int(np.max(yvert))
nx_pix = int(xsci_max - xsci_min)
ny_pix = int(ysci_max - ysci_min)
# Ensure at least 5 PSFs across FoV for imaging
if np.size(npsf_per_full_fov)==1:
xpsf_full = ypsf_full = npsf_per_full_fov
else:
xpsf_full, ypsf_full = npsf_per_full_fov
xpsf = np.max([int(xpsf_full * nx_pix / siaf_ap_obs.XDetSize), 5])
ypsf = np.max([int(ypsf_full * ny_pix / siaf_ap_obs.YDetSize), 5])
# Cut in half for NIRCam SW (4 detectors per FoV)
if self.name.lower()=='nircam' and self.channel.lower()=='short':
xpsf = np.max([int(xpsf / 2), 5])
ypsf = np.max([int(ypsf / 2), 5])
# Create linear set of grid points along x and y axes
if xsci_vals is None:
xsci_vals = np.linspace(xsci_min, xsci_max, xpsf)
if ysci_vals is None:
ysci_vals = np.linspace(ysci_min, ysci_max, ypsf)
# Full set of grid points to generate PSFs
xsci_psf, ysci_psf = np.meshgrid(xsci_vals, ysci_vals)
xsci_psf = xsci_psf.flatten()
ysci_psf = ysci_psf.flatten()
# Convert everything to tel for good measure to store in header
xtel_psf, ytel_psf = siaf_ap_obs.convert(xsci_psf, ysci_psf, 'sci', 'tel')
if use_coeff:
hdul_psfs = self.calc_psf_from_coeff(sp=sp, coord_vals=(xtel_psf, ytel_psf), coord_frame='tel',
wfe_drift=wfe_drift, return_oversample=True, **kwargs)
else:
hdul_psfs = fits.HDUList()
npos = len(xtel_psf)
for xoff, yoff in tqdm(zip(xtel_psf, ytel_psf), total=npos):
res = self.calc_psf(sp=sp, coord_vals=(xoff,yoff), coord_frame='tel',
return_oversample=True)
# If add_distortion take index 2, otherwise index 0
hdu = res[2] if len(res)==4 else res[0]
hdul_psfs.append(hdu)
# Resample if necessary
scale = osamp / self.oversample #hdu.header['OSAMP']
if scale != 1:
for hdu in hdul_psfs:
hdu.data = frebin(hdu.data, scale=scale)
hdu.header['PIXELSCL'] = hdu.header['PIXELSCL'] / scale
hdu.header['OSAMP'] = osamp
if return_coords is None:
return hdul_psfs
elif return_coords=='sci':
xvals, yvals = xsci_psf, ysci_psf
elif return_coords=='tel':
xvals, yvals = xtel_psf, ytel_psf
else:
xvals, yvals = siaf_ap_obs.convert(xsci_psf, ysci_psf, 'sci', return_coords)
return xvals, yvals, hdul_psfs
def _calc_psfs_sgd(self, xoff_asec, yoff_asec, use_coeff=True, return_oversample=True, **kwargs):
"""Calculate small grid dithers PSFs"""
if self.is_coron==False:
_log.warn("`calc_sgd` only valid for coronagraphic observations (set `image_mask` attribute).")
return
if use_coeff:
result = self.calc_psf_from_coeff(coord_frame='idl', coord_vals=(xoff_asec,yoff_asec),
return_oversample=return_oversample, siaf_ap=self.siaf_ap, **kwargs)
else:
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
npos = len(xoff_asec)
# Return HDUList or array of images?
if kwargs.get('return_hdul',True):
result = fits.HDUList()
for xoff, yoff in tqdm(zip(xoff_asec, yoff_asec), total=npos):
res = self.calc_psf(coord_frame='idl', coord_vals=(xoff,yoff),
return_oversample=return_oversample, **kwargs)
if len(res)==4:
hdu = res[2] if return_oversample else res[3]
else:
hdu = res[0] if return_oversample else res[1]
result.append(hdu)
else:
result = []
for xoff, yoff in tqdm(zip(xoff_asec, yoff_asec), total=npos):
res = self.calc_psf(coord_frame='idl', coord_vals=(xoff,yoff),
return_oversample=return_oversample, **kwargs)
result.append(res)
result = np.asarray(result)
setup_logging(log_prev, verbose=False)
return result
[docs]def nrc_mask_trans(image_mask, x, y):
""" Compute the amplitude transmission appropriate for a BLC for some given pixel spacing
corresponding to the supplied Wavefront.
Based on the Krist et al. SPIE paper on NIRCam coronagraph design
*Note* : To get the actual transmission, these values should be squared.
"""
import scipy
if not isinstance(x, np.ndarray):
x = np.asarray([x]).flatten()
y = np.asarray([y]).flatten()
if image_mask[-1]=='R':
r = poppy.accel_math._r(x, y)
if image_mask == 'MASK210R':
sigma = 5.253
elif image_mask == 'MASK335R':
sigma = 3.2927866
elif image_mask == 'MASK430R':
sigma = 2.58832
sigmar = sigma * r
# clip sigma: The minimum is to avoid divide by zero
# the maximum truncates after the first sidelobe to match the hardware
bessel_j1_zero2 = scipy.special.jn_zeros(1, 2)[1]
sigmar.clip(np.finfo(sigmar.dtype).tiny, bessel_j1_zero2, out=sigmar) # avoid divide by zero -> NaNs
transmission = (1 - (2 * scipy.special.j1(sigmar) / sigmar) ** 2)
transmission[r == 0] = 0 # special case center point (value based on L'Hopital's rule)
if image_mask[-1]=='B':
# This is hard-coded to the wedge-plus-flat-regions shape for NIRCAM
# the scale fact should depend on X coord in arcsec, scaling across a 20 arcsec FOV.
# map flat regions to 2.5 arcsec each
# map -7.5 to 2, +7.5 to 6. slope is 4/15, offset is +9.5
wedgesign = 1 if image_mask == 'MASKSWB' else -1 # wide ends opposite for SW and LW
scalefact = (2 + (x * wedgesign + 7.5) * 4 / 15).clip(2, 6)
# Working out the sigma parameter vs. wavelength to get that wedge pattern is non trivial
# This is NOT a linear relationship. See calc_blc_wedge helper fn below.
if image_mask == 'MASKSWB':
polyfitcoeffs = np.array([2.01210737e-04, -7.18758337e-03, 1.12381516e-01,
-1.00877701e+00, 5.72538509e+00, -2.12943497e+01,
5.18745152e+01, -7.97815606e+01, 7.02728734e+01])
elif image_mask == 'MASKLWB':
polyfitcoeffs = np.array([9.16195583e-05, -3.27354831e-03, 5.11960734e-02,
-4.59674047e-01, 2.60963397e+00, -9.70881273e+00,
2.36585911e+01, -3.63978587e+01, 3.20703511e+01])
else:
raise NotImplementedError(f"{image_mask} not a valid name for NIRCam wedge occulter")
sigmas = scipy.poly1d(polyfitcoeffs)(scalefact)
sigmar = sigmas * np.abs(y)
# clip sigma: The minimum is to avoid divide by zero
# the maximum truncates after the first sidelobe to match the hardware
sigmar.clip(min=np.finfo(sigmar.dtype).tiny, max=2 * np.pi, out=sigmar)
transmission = (1 - (np.sin(sigmar) / sigmar) ** 2)
transmission[y == 0] = 0
transmission[np.abs(x) > 10] = 1.0
# Amplitude transmission (square to get intensity transmission; ie., photon throughput)
return transmission
def _transmission_map(self, coord_vals, coord_frame, siaf_ap=None):
if not self.is_coron:
return None
apname_mask = self._psf_coeff_mod['si_mask_apname']
if apname_mask is None:
apname_mask = self.aperturename
siaf_ap_mask = self.siaf[apname_mask]
# Assume cframe corresponds to siaf_ap input
siaf_ap = siaf_ap_mask if siaf_ap is None else siaf_ap
coord_frame = coord_frame.lower()
# Convert to common 'tel' coordinates
cx, cy = np.asarray(coord_vals)
if (siaf_ap.AperName != siaf_ap_mask.AperName):
cx_tel, cy_tel = siaf_ap.convert(cx, cy, coord_frame, 'tel')
cx_idl, cy_idl = siaf_ap_mask.convert(cx_tel, cy_tel, 'tel', 'idl')
elif coord_frame=='idl':
cx_idl, cy_idl = (cx, cy)
elif coord_frame in ['det', 'tel', 'sci']:
cx_idl, cy_idl = siaf_ap_mask.convert(cx, cy, coord_frame, 'idl')
# Get mask transmission
trans = nrc_mask_trans(self.image_mask, cx_idl, cy_idl)
return trans, cx_idl, cy_idl
def _nrc_coron_psf_sums(self, coord_vals, coord_frame, siaf_ap=None, return_max=False, trans=None):
"""
Function to analytically determine the sum and max value
of a NIRCam off-axis coronagraphic PSF while partially
occulted by the coronagrpahic mask.
Keyword Args
============
return_max : bool
By default, this function returns the PSF sums. Set this keyword
to True in order to return the PSF max values instead.
trans : float, ndarray, or None
Transmission is usually sampled from mask transmission function
corresponding to input coordinates. Instead, supply the transmission
value directly.
"""
# Ensure correct scaling for off-axis PSFs
if not self.is_coron:
return None
# Get mask transmission
t_temp, cx_idl, cy_idl = _transmission_map(self, coord_vals, coord_frame, siaf_ap=siaf_ap)
if trans is None:
trans = t_temp**2
# Linear combination of min/max to determine PSF sum
# Get a and b values for each position
avals = trans
bvals = 1 - avals
# print(avals, bvals)
# Store PSF sums for later retrieval
try:
psf_sums_dict = self._psf_sums
except AttributeError:
psf_sums_dict = {}
self._psf_sums = psf_sums_dict
# Offset PSF sum
psf_off_sum = psf_sums_dict.get('psf_off', None)
psf_off_max = psf_sums_dict.get('psf_off_max', None)
if (psf_off_sum is None) or (psf_off_max is None):
psf = _calc_psf_from_coeff(self, return_oversample=False, return_hdul=False,
coord_vals=(10,10), coord_frame='idl')
psf_off_sum = psf.sum()
psf_off_max = np.max(pad_or_cut_to_size(psf,10))
psf_sums_dict['psf_off'] = psf_off_sum
psf_sums_dict['psf_off_max'] = psf_off_max
# Central PSF sum(s)
if self.image_mask[-1] == 'R':
psf_cen_sum = psf_sums_dict.get('psf_cen', None)
psf_cen_max = psf_sums_dict.get('psf_cen_max', None)
if (psf_cen_sum is None) or (psf_cen_max is None):
psf = _calc_psf_from_coeff(self, return_oversample=False, return_hdul=False)
psf_cen_sum = psf.sum()
psf_sums_dict['psf_cen'] = psf_cen_sum
psf_sums_dict['psf_cen_max'] = np.max(pad_or_cut_to_size(psf,10))
elif self.image_mask[-1] == 'B':
# Build a list of bar offsets to interpolate scale factors
xvals = psf_sums_dict.get('psf_cen_xvals', None)
psf_cen_sum_arr = psf_sums_dict.get('psf_cen_sum_arr', None)
psf_cen_max_arr = psf_sums_dict.get('psf_cen_max_arr', None)
if (psf_cen_sum_arr is None) or (psf_cen_max_arr is None):
xvals = np.linspace(-8,8,9)
psf_sums_dict['psf_cen_xvals'] = xvals
psf_cen_sum_arr = []
psf_cen_max_arr = []
for xv in xvals:
psf= _calc_psf_from_coeff(self, return_oversample=False, return_hdul=False,
coord_vals=(xv,0), coord_frame='idl')
psf_cen_sum_arr.append(psf.sum())
psf_cen_max_arr.append(np.max(pad_or_cut_to_size(psf,10)))
psf_cen_sum_arr = np.array(psf_cen_sum_arr)
psf_cen_max_arr = np.array(psf_cen_max_arr)
psf_sums_dict['psf_cen_sum_arr'] = psf_cen_sum_arr
psf_sums_dict['psf_cen_max_arr'] = psf_cen_max_arr
# Interpolation function
finterp = interp1d(xvals, psf_cen_sum_arr, kind='linear', fill_value='extrapolate')
psf_cen_sum = finterp(cx_idl)
finterp = interp1d(xvals, psf_cen_max_arr, kind='linear', fill_value='extrapolate')
psf_cen_max = finterp(cx_idl)
else:
_log.warn(f"Image mask not recognized: {self.image_mask}")
return None
if return_max:
psf_max = avals * psf_off_max + bvals * psf_cen_max
return psf_max
else:
psf_sum = avals * psf_off_sum + bvals * psf_cen_sum
return psf_sum
def _nrc_coron_rescale(self, res, coord_vals, coord_frame, siaf_ap=None, sp=None):
"""
Function for better scaling of NIRCam coronagraphic output for sources
that overlap the image masks.
"""
if coord_vals is None:
return res
nfield = np.size(coord_vals[0])
psf_sum = _nrc_coron_psf_sums(self, coord_vals, coord_frame, siaf_ap=siaf_ap)
if psf_sum is None:
return res
# Scale by countrate of observed spectrum
if (sp is not None) and (not isinstance(sp, list)):
nspec = 1
obs = S.Observation(sp, self.bandpass, binset=self.bandpass.wave)
sp_counts = obs.countrate()
elif (sp is not None) and (isinstance(sp, list)):
nspec = len(sp)
if nspec==1:
obs = S.Observation(sp[0], self.bandpass, binset=self.bandpass.wave)
sp_counts = obs.countrate()
else:
sp_counts = []
for i, sp_norm in enumerate(sp):
obs = S.Observation(sp_norm, self.bandpass, binset=self.bandpass.wave)
sp_counts.append(obs.countrate())
sp_counts = np.array(sp_counts)
else:
nspec = 0
sp_counts = 1
if nspec>1 and nspec!=nfield:
_log.warn("Number of spectra should be 1 or equal number of field points")
# Scale by count rate
psf_sum *= sp_counts
# Re-scale PSF by total sums
if isinstance(res, fits.HDUList):
for i, hdu in enumerate(res):
hdu.data *= (psf_sum[i] / hdu.data.sum())
elif nfield==1:
res *= (psf_sum[0] / res.sum())
else:
for i, data in enumerate(res):
data *= (psf_sum[i] / data.sum())
return res