from __future__ import print_function, division
import numpy as np
from astropy.io import fits
from astropy.time import Time
import datetime, time
import logging
_log = logging.getLogger('pynrc')
from . import conf
from .logging_utils import setup_logging
from webbpsf_ext.webbpsf_ext_core import _check_list
from .nrc_utils import pix_noise
[docs]class multiaccum(object):
"""
A class for defining MULTIACCUM ramp settings.
See `NIRCam MULTIACCUM documentation
<https://jwst-docs.stsci.edu/display/JTI/NIRCam+Detector+Readout+Patterns>`_
for more details.
Parameters
----------------
read_mode : str
NIRCam Ramp Readout mode such as 'RAPID', 'BRIGHT1', 'DEEP8', etc., or 'CUSTOM'
nint : int
Number of integrations (ramps).
ngroup : int
Number of groups in a integration.
nf : int
Number of frames per group.
nd1 : int
Number of drop frame after reset (before first group read). Default=0.
nd2 : int
Number of drop frames within a group (ie., groupgap).
nd3 : int
Number of drop frames after final read frame in ramp. Default=1.
nr1 : int
Number of reset frames within first ramp. Default=0.
nr2 : int
Number of reset frames for subsequent ramps. Default=1.
wind_mode : str
Set to determine maximum number of allowed groups.
Notes
-----
**NIRCam-specific readout modes**
======== === ===
Pattern NF ND2
======== === ===
RAPID 1 0
BRIGHT1 1 1
BRIGHT2 2 0
SHALLOW2 2 3
SHALLOW4 4 1
MEDIUM2 2 8
MEDIUM8 8 2
DEEP2 2 18
DEEP8 8 12
======== === ===
"""
[docs] def __init__(self, read_mode='RAPID', nint=1, ngroup=1, nf=1, nd1=0, nd2=0, nd3=0,
nr1=1, nr2=1, wind_mode='FULL', **kwargs):
self.nint = nint
self._ngroup_max = 10000
self.ngroup = ngroup
# Modify these directly rather via the @property
self._nr1 = self._check_int('nr1',nr1,0)
self._nr2 = self._check_int('nr2',nr2,0)
self._nf = self._check_int('nf',nf,1)
self._nd1 = self._check_int('nd1',nd1,0)
self._nd2 = self._check_int('nd2',nd2,0)
self._nd3 = self._check_int('nd3',nd3,0)
# Now set read mode to specified mode, which may modify nf, nd1, nd2, and nd3
self.read_mode = read_mode
@property
def nint(self):
"""Number of ramps (integrations) in an exposure."""
return self._nint
@nint.setter
def nint(self, value):
self._nint = self._check_int('nint',value,1)
@property
def ngroup(self):
"""Number of groups in a ramp (integration)."""
return self._ngroup
@ngroup.setter
def ngroup(self, value):
value = self._check_int('ngroup',value,1)
if value > self._ngroup_max:
_log.warning('Specified ngroup ({}) greater than allowed value ({})'\
.format(value, self._ngroup_max))
_log.warning('Setting ngroup = {}'.format(self._ngroup_max))
value = self._ngroup_max
self._ngroup = value
@property
def nf(self):
"""Number of frames per group."""
return self._nf
@nf.setter
def nf(self, value):
value = self._check_int('nf',value,1)
self._nf = self._check_custom(value, self._nf)
@property
def nd1(self):
"""Number of drop frame after reset (before first group read)."""
return self._nd1
@nd1.setter
def nd1(self, value):
value = self._check_int('nd1',value,0)
self._nd1 = self._check_custom(value, self._nd1)
@property
def nd2(self):
"""Number of drop frames within a group (aka, groupgap)."""
return self._nd2
@nd2.setter
def nd2(self, value):
value = self._check_int('nd2',value,0)
self._nd2 = self._check_custom(value, self._nd2)
@property
def nd3(self):
"""Number of drop frames after final read frame in ramp."""
return self._nd3
@nd3.setter
def nd3(self, value):
value = self._check_int('nd3',value,0)
self._nd3 = self._check_custom(value, self._nd3)
@property
def nr1(self):
"""Number of reset frames before first integration."""
return self._nr1
@nr1.setter
def nr1(self, value):
self._nr1 = self._check_int('nr1', value, minval=0)
@property
def nr2(self):
"""Number of reset frames for subsequent integrations."""
return self._nr2
@nr2.setter
def nr2(self, value):
self._nr2 = self._check_int('nr2', value, minval=0)
@property
def nread_tot(self):
"""Total number of read frames in a ramp, including drops"""
nf = self.nf; nd1 = self.nd1; nd2 = self.nd2; nd3 = self.nd3
ngroup = self.ngroup
nframes = nd1 + ngroup*nf + (ngroup-1)*nd2 + nd3
return nframes
@property
def read_mode(self):
"""Selected Read Mode in the `patterns_list` attribute."""
return self._read_mode
@read_mode.setter
def read_mode(self, value):
"""Set MULTIACCUM Readout. Automatically updates other relevant attributes."""
if value is None:
_log.info('Readout pattern has None value. Setting to CUSTOM.')
value = 'CUSTOM'
value = value.upper()
_check_list(value, self.patterns_list, var_name='read_mode')
self._read_mode = value
self._validate_readout()
@property
def patterns_list(self):
"""Allowed NIRCam MULTIACCUM patterns"""
plist = sorted(list(self._pattern_settings.keys()))
return ['CUSTOM'] + plist
@property
def _pattern_settings(self):
patterns = ['RAPID', 'BRIGHT1', 'BRIGHT2', 'SHALLOW2', 'SHALLOW4', 'MEDIUM2', 'MEDIUM8', 'DEEP2', 'DEEP8']
nf_arr = [1,1,2,2,4,2,8, 2, 8]
nd2_arr = [0,1,0,3,1,8,2,18,12]
ng_max = [10,10,10,10,10,10,10,20,20]
return dict(zip(patterns, zip(nf_arr, nd2_arr, ng_max)))
[docs] def to_dict(self, verbose=False):
"""Export ramp settings to a dictionary."""
p = [('read_mode',self.read_mode), ('nint',self.nint), ('ngroup',self.ngroup), \
('nf',self.nf), ('nd1',self.nd1), ('nd2',self.nd2), ('nd3',self.nd3)]
return tuples_to_dict(p, verbose)
def _validate_readout(self):
"""
Validation to make sure the defined ngroups, nf, etc. are consistent with
the selected MULTIACCUM readout pattern.
"""
if self.read_mode not in self.patterns_list:
_log.warning('Readout {} not a valid NIRCam readout mode. Setting to CUSTOM.'\
.format(self.read_mode))
self._read_mode = 'CUSTOM'
_log.warning('Using explicit settings: ngroup={}, nf={}, nd1={}, nd2={}, nd3={}'\
.format(self.ngroup, self.nf, self.nd1, self.nd2, self.nd3))
elif self.read_mode == 'CUSTOM':
_log.info('{} readout mode selected.'.format(self.read_mode))
_log.info('Using explicit settings: ngroup={}, nf={}, nd1={}, nd2={}, nd3={}'\
.format(self.ngroup, self.nf, self.nd1, self.nd2, self.nd3))
else:
_log.info('{} readout mode selected.'.format(self.read_mode))
nf, nd2, _ = self._pattern_settings.get(self.read_mode)
self._nf = nf
self._nd1 = 0
self._nd2 = nd2
self._nd3 = 0
_log.info('Setting ngroup={}, nf={}, nd1={}, nd2={}, nd3={}.'\
.format(self.ngroup, self.nf, self.nd1, self.nd2, self.nd3))
def _check_custom(self, val_new, val_orig):
"""Check if read_mode='CUSTOM' before changing variable."""
if self.read_mode == 'CUSTOM':
return val_new
else:
_log.warning("Can only modify parameter if read_mode='CUSTOM'.")
return val_orig
def _check_int(self, pstr, val, minval=1):
"""Check if a value is a positive integer, otherwise throw exception."""
val = float(val)
if (val.is_integer()) and (val>=minval):
return int(val)
else:
raise ValueError("{}={} must be an integer >={}.".format(pstr,val,minval))
[docs]class det_timing(object):
"""
Class to hold detector operations information. Includes SCA attributes such as
detector names and IDs as well as :class:`multiaccum` class for ramp settings.
Parameters
----------------
wind_mode : str
Window mode type 'FULL', 'STRIPE', 'WINDOW'.
xpix : int
Size of window in x-pixels for frame time calculation.
ypix : int
Size of window in y-pixels for frame time calculation.
x0 : int
Lower-left x-coord position of detector window.
y0 : int
Lower-left y-coord position of detector window.
Keyword Args
------------
read_mode : str
NIRCam Ramp Readout mode such as 'RAPID', 'BRIGHT1', etc.
nint : int
Number of integrations (ramps).
ngroup : int
Number of groups in a integration.
nf : int
Number of frames per group.
nd1 : int
Number of drop frame after reset (before first group read).
nd2 : int
Number of drop frames within a group (ie., groupgap).
nd3 : int
Number of drop frames after final read frame in ramp.
Examples
--------
Use kwargs functionality to pass keywords to the multiaccum class.
Send via a dictionary of keywords and values:
>>> kwargs = {'read_mode':'RAPID', 'nint':5, 'ngroup':10}
>>> d = det_timing(**kwargs)
Set the keywords directly:
>>> d = det_timing(read_mode='RAPID', nint=5, ngroup=10)
"""
[docs] def __init__(self, wind_mode='FULL', xpix=2048, ypix=2048, x0=0, y0=0,
mode='SLOW', nff=None, **kwargs):
self._opmode = mode.upper()
if 'JWST' in self._opmode:
pixrate = 1e5
loh = 12
nchans = 4
reset_type = 'pixel'
elif 'SLOW' in self._opmode:
pixrate = 1e5
loh = 8
nchans = 32
reset_type = 'pixel'
if nff is None: nff = 0
elif 'MED' in self._opmode:
pixrate = 3e5
loh = 8
nchans = 32
reset_type = 'pixel'
if nff is None: nff = 0
elif 'FAST' in self._opmode:
pixrate = 5e6
loh = 3
nchans = 32
reset_type = 'line'
if nff is None: nff = 0
self._detector_pixels = 2048
self._nchans = nchans
self._nff = nff
self.multiaccum = multiaccum(wind_mode=wind_mode, **kwargs)
self.wind_mode = wind_mode.upper()
self._xpix = xpix; self._x0 = x0
self._ypix = ypix; self._y0 = y0
self._validate_pixel_settings()
# By default fast-scan readout direction is [-->,<--,-->,<--]
# If same_scan_direction, then all -->
# If reverse_scan_direction, then [<--,-->,<--,-->] or all <--
self.same_scan_direction = False
self.reverse_scan_direction = False
# Pixel Rate in Hz
self._pixel_rate = pixrate
# Number of extra clock ticks per line
self._line_overhead = loh
# Pixel or line resets
self._reset_type = reset_type
@property
def wind_mode(self):
"""Window mode attribute"""
return self._wind_mode
@wind_mode.setter
def wind_mode(self, value):
"""Set Window mode attribute"""
self._wind_mode = value
@property
def y0(self):
return int(self._y0)
@y0.setter
def y0(self, val):
self._y0 = val
@property
def x0(self):
return int(self._x0)
@x0.setter
def x0(self, val):
self._x0 = val
@property
def ypix(self):
return int(self._ypix)
@ypix.setter
def ypix(self, val):
self._ypix = val
@property
def xpix(self):
return int(self._xpix)
@xpix.setter
def xpix(self, val):
self._xpix = val
@property
def nout(self):
"""Number of simultaneous detector output channels stripes"""
return 1 if self.wind_mode == 'WINDOW' else self._nchans
@property
def chsize(self):
"""Size of Amplifier Channel"""
return int(self.xpix // self.nout)
@property
def ref_info(self):
"""Array of reference pixel borders [lower, upper, left, right]."""
det_size = self._detector_pixels
x1 = self.x0; x2 = x1 + self.xpix
y1 = self.y0; y2 = y1 + self.ypix
w = 4 # Width of ref pixel border
lower = int(w-y1)
upper = int(w-(det_size-y2))
left = int(w-x1)
right = int(w-(det_size-x2))
ref_all = np.array([lower,upper,left,right], dtype='int')
ref_all[ref_all<0] = 0
return ref_all
@property
def mask_act(self):
"""Active pixel mask for det coordinates"""
# mask_act = np.zeros([self.ypix,self.xpix]).astype('bool')
# rb, rt, rl, rr = self.ref_info
# mask_act[rb:-rt,rl:-rr] = True # This doesn't work if rr or rt are 0!!
return ~self.mask_ref
@property
def mask_ref(self):
"""Reference pixel mask for det coordinates"""
# [bottom, upper, left, right]
rb, rt, rl, rr = self.ref_info
ref_mask = np.zeros([self.ypix,self.xpix], dtype=bool)
if rb>0: ref_mask[0:rb,:] = True
if rt>0: ref_mask[-rt:,:] = True
if rl>0: ref_mask[:,0:rl] = True
if rr>0: ref_mask[:,-rr:] = True
return ref_mask
@property
def mask_channels(self):
"""Channel masks for det coordinates"""
ch_mask = np.zeros([self.ypix,self.xpix])
for ch in np.arange(self.nout):
ch_mask[:,ch*self.chsize:(ch+1)*self.chsize] = ch
return ch_mask
@property
def nff(self):
"""Number of fast row resets that occur before Reset Frame"""
if self._nff is None:
if self.wind_mode=='WINDOW':
if self.ypix>256: nff = 2048
elif self.ypix>64: nff = 512
elif self.ypix>16: nff = 256
elif self.ypix>8: nff = 64
else: nff = 16
elif self.wind_mode=='STRIPE':
if self.ypix==2048: nff = 0
elif self.ypix>=256: nff = 2048
else: nff = 512
elif self.wind_mode=='FULL':
nff = 0
else:
nff = self._nff
return nff
@nff.setter
def nff(self, val):
self._nff = val
def _validate_pixel_settings(self):
"""
Validation to make sure the defined pixel sizes are consistent with
detector readout mode (FULL, STRIPE, or WINDOW)
"""
wind_mode = self.wind_mode
modes = ['FULL', 'STRIPE', 'WINDOW']
if wind_mode not in modes:
wstr = str.join(', ', modes)
raise ValueError("{} not a valid readout mode. Acceptable values: {}".format(wind_mode, wstr))
detpix = self._detector_pixels
xpix = self.xpix; x0 = self.x0
ypix = self.ypix; y0 = self.y0
# Check some consistencies with frame sizes
if wind_mode == 'FULL':
if ypix != detpix:
_log.warn(f'In {wind_mode} mode, but ypix not {detpix}. Setting ypix={detpix}.')
ypix = detpix
if y0 != 0:
_log.warn(f'In {wind_mode} mode, but x0 not 0. Setting y0=0.')
y0 = 0
if (wind_mode == 'STRIPE') or (wind_mode == 'FULL'):
if xpix != detpix:
_log.warn(f'In {wind_mode} mode, but xpix not {detpix}. Setting xpix={detpix}.')
xpix = detpix
if x0 != 0:
_log.warn(f'In {wind_mode} mode, but x0 not 0. Setting x0=0.')
x0 = 0
if (x0+xpix) > detpix:
raise ValueError("x0+xpix ({}+{}) is larger than detector size ({})!"\
.format(x0,xpix,detpix))
if (y0+ypix) > detpix:
raise ValueError("y0+ypix ({}+{}) is larger than detector size ({})!"\
.format(y0,ypix,detpix))
# Update values if no errors were thrown
self._xpix = xpix; self._x0 = x0
self._ypix = ypix; self._y0 = y0
def _fix_precision(self, input):
"""
Many timing calculations result from minor precision issues with very
small numbers (1e-16) added to the real result. This function attempts
to truncate these small innaccuracies by dividing by the clock sample
time to get the total integer number of clock cycles.
"""
return int(input * self._pixel_rate + 0.5) / self._pixel_rate
@property
def _extra_lines(self):
"""Number of extra lines per frame.
Determine how many extra lines/rows are added to a to a given frame.
Based on ASIC microcode 10 and NIRCam arrays. Other instruments might
operate differently.
"""
if self.nout == 1:
xtra_lines = 2 if self.xpix>10 else 3
else:
xtra_lines = 1
return xtra_lines
@property
def _exp_delay(self):
"""Transition to idle delay (sec)
Additional overhead time at the end of an exposure.
This does not add any more photon flux to a pixel.
Due to transition to idle.
"""
# Window Mode
if self.nout == 1:
if self.xpix>150: xtra_lines = 0
elif self.xpix>64: xtra_lines = 1
elif self.xpix>16: xtra_lines = 2
elif self.xpix>8: xtra_lines = 4
else: xtra_lines = 5
# Full and Stripe
else:
xtra_lines = 1
# Clock ticks per line
xticks = self.chsize + self._line_overhead
# Return number of seconds
return xticks * xtra_lines / self._pixel_rate
@property
def _frame_overhead_pix(self):
"""
Full and Stripe mode frames have an additional pixel at the end.
"""
pix_offset = 0 if self.nout==1 else 1
return pix_offset
@property
def time_row_reset(self):
"""NFF Row Resets time per integration"""
nff = self.nff
xtra_lines = int(nff / (self.chsize))
# Clock ticks per line
xticks = self.chsize + self._line_overhead
return xticks * xtra_lines / self._pixel_rate
@property
def time_frame(self):
"""Determine frame time (sec) based on xpix, ypix, and wind_mode."""
chsize = self.chsize # Number of x-pixels within a channel
xticks = chsize + self._line_overhead # Clock ticks per line
flines = self.ypix + self._extra_lines # Lines per frame
# Add a single pix offset for full frame and stripe.
pix_offset = self._frame_overhead_pix
#end_delay = 0 # Used for syncing each frame w/ FPE bg activity. Not currently used.
# Total number of clock ticks per frame (reset, read, and drops)
fticks = xticks*flines + pix_offset
# Return frame time
return fticks / self._pixel_rate
@property
def time_group(self):
"""Time per group based on time_frame, nf, and nd2."""
return self.time_frame * (self.multiaccum.nf + self.multiaccum.nd2)
@property
def time_ramp(self):
"""Photon collection time for a single ramp."""
# How many total frames (incl. dropped and all) per ramp?
# Exclude nd3 (drops that add nothing)
ma = self.multiaccum
nf = ma.nf; nd1 = ma.nd1; nd2 = ma.nd2
ngroup = ma.ngroup
tint = (nd1 + ngroup*nf + (ngroup-1)*nd2) * self.time_frame
return self._fix_precision(tint)
@property
def time_int(self):
"""Same as time_ramp, except that 'int' follows the JWST nomenclature"""
return self.time_ramp
@property
def time_ramp_eff(self):
"""Effective ramp time for slope fit tf*(ng-1)"""
ma = self.multiaccum
if ma.ngroup<=1:
res = self.time_frame * (ma.nd1 + (ma.nf + 1) / 2)
else:
res = self.time_group * (ma.ngroup - 1)
return self._fix_precision(res)
@property
def time_int_eff(self):
"""Same as time_ramp_eff, except that 'int' follows the JWST nomenclature"""
return self.time_ramp_eff
@property
def time_exp(self):
"""Total photon collection time for all ramps."""
res = self.multiaccum.nint * self.time_ramp
return self._fix_precision(res)
# @property
# def time_total_int(self):
# """Total time for all frames in a ramp.
# Includes resets and excess drops, as well as NFF Rows Reset.
# """
# ma = self.multiaccum
# nf = ma.nf; nd1 = ma.nd1; nd2 = ma.nd2; nd3 = ma.nd3
# ngroup = ma.ngroup
# nr = 1
# nframes = nr + nd1 + ngroup*nf + (ngroup-1)*nd2 + nd3
# return nframes * self.time_frame + self.time_row_reset
@property
def time_total_int1(self):
"""Total time for all frames in first ramp of exposure.
Includes resets and excess drops, as well as NFF Rows Reset.
"""
ma = self.multiaccum
nf = ma.nf; nd1 = ma.nd1; nd2 = ma.nd2; nd3 = ma.nd3
ngroup = ma.ngroup
nr = ma.nr1
nframes = nr + nd1 + ngroup*nf + (ngroup-1)*nd2 + nd3
res = nframes * self.time_frame + self.time_row_reset
return self._fix_precision(res)
@property
def time_total_int2(self):
"""Total time for all frames in a subsequent ramp.
Includes resets and excess drops, as well as NFF Rows Reset.
Only differs from time_total_int1 in case nr1 != nr2
"""
ma = self.multiaccum
nf = ma.nf; nd1 = ma.nd1; nd2 = ma.nd2; nd3 = ma.nd3
ngroup = ma.ngroup
nr = ma.nr2
# Set to 0 if only a single integration
if ma.nint <= 1:
return 0.
else:
nframes = nr + nd1 + ngroup*nf + (ngroup-1)*nd2 + nd3
res = nframes * self.time_frame + self.time_row_reset
return self._fix_precision(res)
@property
def time_total(self):
"""Total exposure acquisition time"""
# exp1 = 0 if self.multiaccum.nint == 0 else self.time_total_int1
# exp2 = 0 if self.multiaccum.nint <= 1 else self.time_total_int2 * (self.multiaccum.nint-1)
exp1 = self.time_total_int1
exp2 = self.time_total_int2 * (self.multiaccum.nint-1)
res = exp1 + exp2 + self._exp_delay
return self._fix_precision(res)
@property
def times_group_avg(self):
"""Times at each averaged group since reset"""
ma = self.multiaccum
nf_avg = np.arange(ma.nf+1).sum() / ma.nf
return np.arange(ma.ngroup) * self.time_group + (ma.nd1 + nf_avg) * self.time_frame
[docs] def to_dict(self, verbose=False):
"""Export detector settings to a dictionary."""
p = [('wind_mode',self.wind_mode), ('nout',self.nout),
('xpix',self.xpix), ('ypix',self.ypix), ('x0',self.x0), ('y0',self.y0)]
return tuples_to_dict(p, verbose)
[docs] def times_to_dict(self, verbose=False):
"""Export ramp times as dictionary with option to print output to terminal."""
times = [('t_frame', self.time_frame), ('t_group', self.time_group),
('t_int', self.time_int), ('t_exp', self.time_exp),
('t_acq', self.time_total),
('t_int_tot1', self.time_total_int1),
('t_int_tot2', self.time_total_int2)]
return tuples_to_dict(times, verbose)
[docs] def int_times_table(self, date_start, time_start, offset_seconds=None):
"""Create and populate the INT_TIMES table, which is saved as a
separate extension in the output data file.
Parameters
----------
date_start : str
Date string of observation ('2020-02-28')
time_start : str
Time string of observation ('12:24:56')
offset_seconds : None or float
Time from beginning of observation until start of integration.
Returns
-------
int_times_tab : astropy.table.Table
Table of starting, mid, and end times for each integration
"""
from astropy.table import Table
from astropy.time import Time, TimeDelta
from astropy import units as u
if offset_seconds is None:
offset_seconds = 0
integration_numbers = np.arange(self.multiaccum.nint)
start_time_string = date_start + 'T' + time_start
start_time = Time(start_time_string) + offset_seconds * u.second
integration_time = self.time_total_int2
integ_time_delta = TimeDelta(integration_time * u.second)
start_times = start_time + (integ_time_delta * integration_numbers)
reset_time = self.multiaccum.nr2 * self.time_frame
integration_time_exclude_reset = TimeDelta((integration_time - reset_time) * u.second)
end_times = start_times + integration_time_exclude_reset
mid_times = start_times + integration_time_exclude_reset / 2.
# For now, let's keep the BJD (Barycentric?) times identical
# to the MJD times.
start_times_bjd = start_times
mid_times_bjd = mid_times
end_times_bjd = end_times
# Create table
nrows = len(integration_numbers)
data_list = [(integration_numbers[i] + 1,
start_times.mjd[i], mid_times.mjd[i], end_times.mjd[i],
start_times_bjd.mjd[i], mid_times_bjd.mjd[i], end_times_bjd.mjd[i])
for i in range(nrows)]
int_times_tab = np.array(data_list,
dtype=[('integration_number','<i2'),
('int_start_MJD_UTC','<f8'),
('int_mid_MJD_UTC', '<f8'),
('int_end_MJD_UTC','<f8'),
('int_start_BJD_TDB','<f8'),
('int_mid_BJD_TDB','<f8'),
('int_end_BJD_TDB','<f8')])
return int_times_tab
[docs] def pixel_noise(self, ng=None, nf=None, verbose=False, **kwargs):
"""Noise values per pixel.
Return theoretical noise calculation for the specified MULTIACCUM exposure
in terms of e-/sec. This uses the pre-defined detector-specific noise
properties. Can specify flux of a source as well as background and
zodiacal light (in e-/sec/pix). After getting the noise per pixel per
ramp (integration), value(s) are divided by the sqrt(NINT) to return
the final noise
Parameters
----------
ng : None or int or image
Option to explicitly state number of groups. This is specifically
used to enable the ability of only calculating pixel noise for
unsaturated groups for each pixel. If a numpy array, then it should
be the same shape as `fsrc` image. By default will use `self.multiaccum.ngroup`.
nf : int
Option to explicitly states number of frames in each group.
By default will use `self.multiaccum.nf`.
verbose : bool
Print out results at the end.
Keyword Arguments
-----------------
rn : float
Read Noise per pixel (e-).
ktc : float
kTC noise (in e-). Only valid for single frame (n=1)
p_excess : array-like
An array or list of two elements that holds the parameters
describing the excess variance observed in effective noise plots.
By default these are both 0. For NIRCam detectors, recommended
values are [1.0,5.0] for SW and [1.5,10.0] for LW.
idark : float
Dark current in e-/sec/pix.
fsrc : float
Flux of source in e-/sec/pix.
fzodi : float
Zodiacal light emission in e-/sec/pix.
fbg : float
Any additional background (telescope emission or scattered light?)
ideal_Poisson : bool
If set to True, use total signal for noise estimate,
otherwise MULTIACCUM equation is used.
Notes
-----
fsrc, fzodi, and fbg are functionally the same as they are immediately summed.
They can also be single values or multiple elements (list, array, tuple, etc.).
If multiple inputs are arrays, make sure their array sizes match.
"""
ma = self.multiaccum
if ng is None:
ng = ma.ngroup
if nf is None:
nf = ma.nf
# Pixel noise per ramp (e-/sec/pix)
pn = pix_noise(ngroup=ng, nf=nf, nd2=ma.nd2, tf=self.time_frame, **kwargs)
# Divide by sqrt(Total Integrations)
final = pn / np.sqrt(ma.nint)
if verbose:
print('Noise (e-/sec/pix): {}'.format(final))
print('Total Noise (e-/pix): {}'.format(final*self.time_exp))
return final
[docs] def pix_timing_map(self, same_scan_direction=None, reverse_scan_direction=None,
avg_groups=False, reset_zero=False, return_flat=False):
"""Create array of pixel times for a single ramp.
Each pixel value corresponds to the precise time at which
that pixel was read out during the ramp acquisition. The first
pixel(s) have t=0.
Parameters
----------
same_scan_direction : bool
Are all the output channels read in the same direction?
By default fast-scan readout direction is ``[-->,<--,-->,<--]``
If ``same_scan_direction``, then all ``-->``
reverse_scan_direction : bool
If ``reverse_scan_direction``, then ``[<--,-->,<--,-->]`` or all ``<--``
avg_groups : bool
For groups where nf>1, the telescope data gets averaged via a
bit-shifter. Setting ``avg_groups=True`` also averages the
pixel times in a similar manner. Default is True.
return_flat : bool
Keyword Args
------------
reset_zero : bool
Return timing relative to when reset to get photon-collection time of each pixel.
Otherwise, t=0 corresponds to very first pixel(s) read in the ramp.
Returns
-------
ndarray
If ``return_flat=True`` then the data is a flattened array for a
single channel output. Otherwise, the output is a data cube of the
same size and shape of the raw data with these detector settings.
Example
-------
Assume you have a cube of raw full frame data (RAPID, ngroup=5).
Create a det_timing instance and get channel:
>>> d = det_timing(ngroup=5)
>>> tarr = d.pixel_timing_map(return_flat=True, avg_groups=True)
>>> nx, ny = (d.xpix, d.ypix)
>>> nout = d.nout # Number of amplifier output channels
>>> chsize = d.chsize # Channel size (x-direction)
>>> # Reshape into (nz, ny, nout, chsize)
>>> data = data.reshape([-1,ny,nout,chsize])
>>> # Reverse odd channels in x-direction to match even chans
>>> for ch in range(nout):
>>> if np.mod(ch,2)==1:
>>> data[:,:,ch,:] = data[:,:,ch,::-1]
>>> # Final data reshaped into 4 flattened output channels
>>> data = data.transpose([0,1,3,2]).reshape([-1,nout])
>>> # Can plot this like plt.plot(tarr, data) to make nout line plots
"""
xpix = self.xpix
ypix = self.ypix
nout = self.nout
chsize = self.chsize # Number of x-pixels within a channel
xticks = chsize + self._line_overhead # Clock ticks per line
flines = ypix + self._extra_lines # Lines per frame
if same_scan_direction is None:
same_scan_direction = self.same_scan_direction
if reverse_scan_direction is None:
reverse_scan_direction = self.reverse_scan_direction
# Pixel-by-pixel or line-by-line reset?
line_reset = True if 'line' in self._reset_type.lower() else False
# Add a single pix offset for full frame and stripe.
pix_offset = self._frame_overhead_pix
# Total number of clock ticks per frame (reset, read, and drops)
fticks = xticks*flines + pix_offset
# Total ticks in a ramp (exclude nd3 drop frames)
ma = self.multiaccum
nf = ma.nf; nd1 = ma.nd1; nd2 = ma.nd2
ngroup = ma.ngroup
nframes = nd1 + ngroup*nf + (ngroup-1)*nd2
nticks = fticks * nframes
# Make large array of pixel timing
arr = np.arange(nticks, dtype=np.float).reshape([nframes,-1])
arr_reset = np.arange(fticks, dtype=np.float)
if reset_zero:
arr += arr_reset.max() + 1
# Chop off single pix overhead
if pix_offset>0:
arr = arr[:,:-pix_offset]
arr_reset = arr_reset[:-pix_offset]
# Reshape to 3D array
arr = arr.reshape([nframes, flines, xticks])
arr_reset = arr_reset.reshape([flines, xticks])
# If performing line-by-line reset, all pixels in a line
# were reset at the same time.
if line_reset:
for i in range(flines):
arr_reset[i,:] = np.max(arr_reset[i,:])
# Chop off x pixel & y line overheads
arr = arr[:,:ypix,:chsize]
arr_reset = arr_reset[:ypix,:chsize]
# By default fast-scan readout direction is [-->,<--,-->,<--]
# If same_scan_direction, then all -->
# If reverse_scan_direction, then [<--,-->,<--,-->] or all <--
if reverse_scan_direction:
arr = arr[::-1]
arr_reset = arr_reset[:,::-1]
arr_list = []
arr2_list = []
if nout>1:
# Consecutive outputs reversed?
for ch in range(nout):
if (np.mod(ch,2) == 0) or (same_scan_direction == True):
arr_list.append(arr)
arr2_list.append(arr_reset)
else:
arr_list.append(arr[:,:,::-1])
arr2_list.append(arr_reset[:,::-1])
data = np.concatenate(arr_list, axis=2)
data_reset = np.concatenate(arr2_list, axis=1)
else:
data = arr
data_reset = arr_reset
del arr, arr_list, arr2_list, arr_reset
# Timing for averaged (bit-shifted) frames
# Remove drops and average grouped data
if (avg_groups and nf>1) or (nd2>0):
# Trailing drop frames already excluded
# so need to pull off last group of frames
# in order to properly reshape things.
if avg_groups and (nf>1):
data_end = data[-nf:,:,:].mean(axis=0)
else:
data_end = data[-nf:,:,:]
data_end = data_end.reshape([-1,ypix,xpix])
# Only care about first (n-1) groups
# Last group is handled separately
# Cut off last set of nf frames
data = data[:-nf,:,:]
# Reshape for easy group manipulation
data = data.reshape([-1,nf+nd2,ypix,xpix])
# Trim off the dropped frames (nd2)
if nd2>0: data = data[:,:nf,:,:]
# Average the frames within groups
# In reality, the 16-bit data is bit-shifted
if avg_groups and (nf>1):
data = data.mean(axis=1)
else:
data = data.reshape([-1,ypix,xpix])
# Add back the last group (already averaged)
data = np.append(data,data_end,axis=0)
if reset_zero:
for im in data:
im -= data_reset
# Put into time
# print(data.dtype)
data /= self._pixel_rate
# Return timing info
if return_flat: # Flatten array
return data.ravel()
else: # Get rid of dimensions of length 1
return data.squeeze()
[docs]def tuples_to_dict(pairs, verbose=False):
"""
Take a list of paired tuples and convert to a dictionary
where the first element of each tuple is the key and the
second element is the value.
"""
d={}
for (k, v) in pairs:
d[k] = v
if verbose:
if isinstance(v,float): print("{:<10} {:>10.4f}".format(k, v))
else: print("{:<10} {:>10}".format(k, v))
return d
[docs]def config2(input, intype='int'):
"""NIRCam CONFIG2 (0x4011) Register
Return a dictionary of configuration parameters depending on the
value of CONFIG2 register (4011).
Parameters
----------
input : int, str
Value of CONFIG2, nominally as an int. Binary and Hex values
can also be passed as strings.
intype: str
Input type (int, hex, or bin) for integer, hex, string,
or binary string.
"""
if 'hex' in intype:
if '0x' in input:
input = int(input, 0)
else:
input = int(input, 16)
if 'bin' in intype:
if '0b' in input:
input = int(input, 0)
else:
input = int(input, 2)
# Convert to 16-bit binary string
input = "{0:016b}".format(input)
# Config2 Bits (Right to Left)
# ----------------------------
# 0 : Vertical Enable
# 1 : Horizontal Enable
# 2 : Global reset per integration
# 3 : Enable Fast row-by-row reset (only in window/stripe)
# 6-4 : Number of fast row resets per int
# 7 : Window mode in Idle when window enabled?
# 8 : 0 = Preamp reset per frame; 1 = reset per row
# 9 : Permanent Reset
# 10 : Single step mode
# 11 : Test pattern
# 12 : FGS window mode
# 13 : Power down preamp, adc, and ap during Idle
# 14 : Power down preamp, adc, and ap during Drop
# 15 : 0 = Preamp reset per frame; 1 = reset per integration
# NFF Rows Reset
# --------------
# 000 = 1
# 001 = 4
# 010 = 16
# 011 = 64
# 100 = 256
# 101 = 512
# 110 = 1024
# 111 = 2048
nff_dict = {'000': 1, '001': 4, '010': 16, '011': 64,
'100': 256, '101': 512, '110':1024, '111':2048}
# Reverse for easier indexing of single values
input2 = input[::-1]
d = {}
d['00_window_vert'] = True if bool(int(input2[0])) else False
d['01_window_horz'] = True if bool(int(input2[1])) else False
d['02_global_reset'] = True if bool(int(input2[2])) else False
d['03_rows_reset'] = True if bool(int(input2[3])) else False
d['04_rows_nff'] = nff_dict.get(input2[4:7][::-1])
d['07_idle_window'] = True if bool(int(input2[7])) else False
d['08_pa_reset'] = 'row' if bool(int(input2[8])) else 'frame'
d['09_perm_reset'] = True if bool(int(input2[9])) else False
d['10_single_step'] = True if bool(int(input2[10])) else False
d['11_test_patt'] = True if bool(int(input2[11])) else False
d['12_fgs_wind'] = True if bool(int(input2[12])) else False
d['13_power_idl'] = True if bool(int(input2[13])) else False
d['14_power_drop'] = True if bool(int(input2[14])) else False
d['15_pa_reset'] = 'int' if bool(int(input2[15])) else 'frame'
return d
[docs]def create_detops(header, DMS=False, read_mode=None, nint=None, ngroup=None,
detector=None, wind_mode=None, xpix=None, ypix=None, x0=None, y0=None,
nff=None):
"""NIRCam Detector class from header
Create a NIRCam detector class based on header settings.
Can override settings with a variety of keyword arguments.
Parameters
----------
header : obj
Header from NIRCam FITS file
DMS : bool
Is header format from Data Management Systems? Otherwises, ISIM-like.
Keyword Args
------------
read_mode : str
NIRCam Ramp Readout mode such as 'RAPID', 'BRIGHT1', etc.
nint : int
Number of integrations (ramps).
ngroup : int
Number of groups in a integration.
detector : int, str
NIRCam detector ID (481-490) or SCA ID (A1-B5).
wind_mode : str
Window mode type 'FULL', 'STRIPE', 'WINDOW'.
xpix : int
Size of window in x-pixels for frame time calculation.
ypix : int
Size of window in y-pixels for frame time calculation.
x0 : int
Lower-left x-coord position of detector window.
y0 : int
Lower-left y-coord position of detector window.
nff : int
Number of fast row resets.
"""
# Create detector class
from pynrc.pynrc_core import DetectorOps
# Detector ID
if detector is None:
detector = header.get('SCA_ID')
if detector is None:
detector = header.get('DETECTOR')
# Detector size
xpix = header['SUBSIZE1'] if DMS else header['NAXIS1'] if xpix is None else xpix
ypix = header['SUBSIZE2'] if DMS else header['NAXIS2'] if ypix is None else ypix
# Subarray position
# Headers are 1-indexed, while detector class is 0-indexed
if x0 is None:
x1 = header['SUBSTRT1'] if DMS else header['COLCORNR']
x0 = x1 - 1
if y0 is None:
y1 = header['SUBSTRT2'] if DMS else header['ROWCORNR']
y0 = y1 - 1
# Subarray setting: Full, Stripe, or Window
if wind_mode is None:
if xpix==ypix==2048:
wind_mode = 'FULL'
else:
log_prev = conf.logging_level
setup_logging('ERROR', verbose=False)
# Test if STRIPE or WINDOW
det_stripe = DetectorOps(detector, 'STRIPE', xpix, ypix, x0, y0)
det_window = DetectorOps(detector, 'WINDOW', xpix, ypix, x0, y0)
dt_stripe = np.abs(header['TFRAME'] - det_stripe.time_frame)
dt_window = np.abs(header['TFRAME'] - det_window.time_frame)
wind_mode = 'STRIPE' if dt_stripe<dt_window else 'WINDOW'
setup_logging(log_prev, verbose=False)
# Add MultiAccum info
hnames = ['READPATT', 'NINTS', 'NGROUPS'] if DMS else ['READOUT', 'NINT', 'NGROUP']
read_mode = header[hnames[0]] if read_mode is None else read_mode
nint = header[hnames[1]] if nint is None else nint
ngroup = header[hnames[2]] if ngroup is None else ngroup
ma_args = {'read_mode':read_mode, 'nint':nint, 'ngroup':ngroup}
return DetectorOps(detector, wind_mode, xpix, ypix, x0, y0, nff, **ma_args)