import numpy as np
import logging
_log = logging.getLogger('pynrc')
# Import libraries
import numpy as np
import pynrc
from pynrc.maths import robust
from pynrc import DetectorOps, setup_logging, conf
from scipy.signal import savgol_filter
[docs]class NRC_refs(object):
"""Reference pixel correction object
Object class for reference pixel correction of NIRCam data (single integration).
Specify the data cube, header, and whether or not the header is in DMS format.
General usage of functions:
1. Create instance: ``ref = NRC_refs(data, header)``
2. Determine reference offset values: ``ref.calc_avg_amps()``.
Stored at ``ref.refs_amps_avg``.
3. Fix amplifier offsets: ``ref.correct_amp_refs()``.
Removes offsets that are stored at ``ref.refs_amps_avg``.
4. Determine average of column references tracking 1/f noise: ``ref.calc_avg_cols()``.
Reference values offset for a mean value of 0.
Averages are stored at ``ref.refs_side_avg``.
5. Optimal smoothing of side reference values: ``ref.calc_col_smooth()``.
Stores smoothed version at ``ref.refs_side_smth``.
6. Remove approximation of 1/f noise: ``ref.correct_col_refs()``.
Parameters
----------
data : ndarray
Input datacube. Can be two or three dimensions (nz,ny,nx).
header : obj
NIRCam Header associated with data.
DMS : bool
Is the header in DMS format?
altcol : bool
Calculate separate reference values for even/odd columns?
Default=True.
do_all : bool
Perform the default pixel correction procedures.
"""
[docs] def __init__(self, data, header, DMS=False, altcol=True, do_all=False, **kwargs):
# Convert to float if necessary
if 'float' not in data.dtype.name:
type_in = data.dtype.name
data = data.astype(np.float, copy=False)
type_out = data.dtype.name
#print('Converting data from {} to {}'.format(type_in, type_out))
# Check the number of dimensions are valid.
ndim = len(data.shape)
if ndim==2:
ny,nx = data.shape
data = data.reshape((1,ny,nx))
print('Reshaping data to 3 dimensions (nz,ny,nx)')
elif ndim==3:
pass
else:
raise ValueError('Input data can only have 2 or 3 dimensions. \
Found {} dimensions.'.format(ndim))
self.data = data
self.header = header
self.DMS = DMS
self.altcol = altcol
# Create a detector class
self._create_detops(**kwargs)
# Reference info from header
ref_all = self.detector.ref_info
self.nref_t = self.header.get('TREFROW', ref_all[1])
self.nref_b = self.header.get('BREFROW', ref_all[0])
self.nref_l = self.header.get('LREFCOL', ref_all[2])
self.nref_r = self.header.get('RREFCOL', ref_all[3])
# Check that reference pixels match up correctly between header and det class
assert self.nref_t == ref_all[1], 'Number of top reference rows do not match.'
assert self.nref_b == ref_all[0], 'Number of bottom reference rows do not match.'
assert self.nref_l == ref_all[2], 'Number of left reference columns do not match.'
assert self.nref_r == ref_all[3], 'Number of right reference columns do not match.'
# Set amplifier offset values to None initially
self.refs_amps_avg = None
# Set column reference values to None initially
self.refs_side_avg = None
# Perform all the usual ref pixel corrections with defaults
if do_all:
self.calc_avg_amps()
self.correct_amp_refs()
self.calc_avg_cols()
self.calc_col_smooth()
self.correct_col_refs()
def _create_detops(self, read_mode=None, nint=None, ngroup=None, detector=None,
wind_mode=None, xpix=None, ypix=None, x0=None, y0=None, nff=None, **kwargs):
"""
Create a detector class based on header settings.
"""
from ..detops import create_detops
header = self.header
DMS = self.DMS
det = create_detops(header, DMS=DMS, read_mode=read_mode, nint=nint, ngroup=ngroup,
detector=detector, wind_mode=wind_mode, xpix=xpix, ypix=ypix, x0=x0, y0=y0,
nff=nff, **kwargs)
self.detector = det
# # Detector ID
# if detector is None:
# detector = header.get('SCA_ID')
# if detector is None:
# detector = header.get('DETECTOR')
# # Detector size
# xpix = header['SUBSIZE1'] if DMS else header['NAXIS1'] if xpix is None else xpix
# ypix = header['SUBSIZE2'] if DMS else header['NAXIS2'] if ypix is None else ypix
# # Subarray position
# # Headers are 1-indexed, while detector class is 0-indexed
# if x0 is None:
# x1 = header['SUBSTRT1'] if DMS else header['COLCORNR']
# x0 = x1 - 1
# if y0 is None:
# y1 = header['SUBSTRT2'] if DMS else header['ROWCORNR']
# y0 = y1 - 1
# # Subarray setting: Full, Stripe, or Window
# if wind_mode is None:
# if xpix==ypix==2048:
# wind_mode = 'FULL'
# else:
# # Turn off log warnings
# log_prev = conf.logging_level
# setup_logging('ERROR', verbose=False)
# # Test if STRIPE or WINDOW
# det_stripe = DetectorOps(detector, 'STRIPE', xpix, ypix, x0, y0)
# det_window = DetectorOps(detector, 'WINDOW', xpix, ypix, x0, y0)
# dt_stripe = np.abs(header['TFRAME'] - det_stripe.time_frame)
# dt_window = np.abs(header['TFRAME'] - det_window.time_frame)
# wind_mode = 'STRIPE' if dt_stripe<dt_window else 'WINDOW'
# # Restore previous log levels
# setup_logging(log_prev, verbose=False)
# # Add MultiAccum info
# hnames = ['READPATT', 'NINTS', 'NGROUPS'] if DMS else ['READOUT', 'NINT', 'NGROUP']
# read_mode = header[hnames[0]] if read_mode is None else read_mode
# nint = header[hnames[1]] if nint is None else nint
# ngroup = header[hnames[2]] if ngroup is None else ngroup
# ma_args = {'read_mode':read_mode, 'nint':nint, 'ngroup':ngroup}
# # Create detector class
# self.detector = DetectorOps(detector, wind_mode, xpix, ypix, x0, y0, **ma_args)
@property
def multiaccum(self):
"""A :class:`~pynrc.multiaccum` object"""
return self.detector.multiaccum
@property
def multiaccum_times(self):
"""Exposure timings in dictionary"""
return self.detector.times_to_dict()
@property
def refs_bot(self):
"""Return raw bottom reference values"""
if self.nref_l>0:
return self.data[:,:self.nref_b,:]
else:
return None
@property
def refs_top(self):
"""Return raw top reference values"""
if self.nref_l>0:
return self.data[:,-self.nref_t:,:]
else:
return None
@property
def refs_right(self):
"""Return raw right reference values"""
if self.nref_l>0:
return self.data[:,:,-self.nref_r:]
else:
return None
@property
def refs_left(self):
"""Return raw left reference values"""
if self.nref_l>0:
return self.data[:,:,:self.nref_l]
else:
return None
[docs] def calc_avg_amps(self, top_ref=True, bot_ref=True):
"""Calculate amplifier averages
Save the average reference value for each amplifier in each frame.
Each array has a size of (namp, ngroup). Average values are
saved at ``self.refs_amps_avg``.
Parameters
----------
top_ref : bool
Include top reference rows when correcting channel offsets.
bot_ref : bool
Include bottom reference rows when correcting channel offsets.
"""
nchans = self.detector.nout
#chsize = self.detector.chsize
data_shape = self.data.shape
if self.nref_t==0: top_ref = False
if self.nref_b==0: bot_ref = False
if top_ref and bot_ref:
refs_all = np.hstack((self.refs_bot, self.refs_top))
elif bot_ref and (not top_ref):
refs_all = self.refs_bot
elif top_ref and (not bot_ref):
refs_all = self.refs_top
else:
print("No top or bottom reference pixels to calculate offset values.")
#self.refs_amps_avg = None
return
self.refs_amps_avg = calc_avg_amps(refs_all, data_shape, nchans, self.altcol)
self.supermean = robust.mean(refs_all)
[docs] def correct_amp_refs(self, supermean=False):
"""Correct amplifier offsets
Use values in ``self.refs_amps_avg`` to correct amplifier offsets.
Parameters
----------
supermean : bool
Add back the overall mean of the reference pixels.
"""
# Check to make sure refs_amps_avg is valid
if (self.refs_amps_avg is None):
_log.warning('self.refs_amps_avg is set to None. No offsets applied.')
return
#raise ValueError('self.refs_amps_avg is set to None')
# Supermean
# the average of the average is the DC level of the output channel
smean = self.supermean if supermean else 0.0
nchans = self.detector.nout
chsize = self.detector.chsize
nz, ny, nx = self.data.shape
for ch in range(nchans):
# Channel indices
ich1 = int(ch*chsize)
ich2 = int(ich1 + chsize)
# In-place subtraction of channel averages
if self.altcol:
for i in range(nz):
self.data[i,:,ich1:ich2-1:2] -= self.refs_amps_avg[0][ch,i]
self.data[i,:,ich1+1:ich2:2] -= self.refs_amps_avg[1][ch,i]
else:
for i in range(nz):
self.data[i,:,ich1:ich2] -= self.refs_amps_avg[ch,i]
# Add back supermean
if supermean: self.data += smean
[docs] def calc_avg_cols(self, left_ref=True, right_ref=True, avg_type='frame', **kwargs):
"""Calculate average of column references
Create a copy of the left and right reference pixels, removing the
average value of the reference pixels on an int, frame, or pixel basis.
Do this after correcting the amplifier offsets with ``correct_amp_refs()``.
Averages are stored in ``self.refs_side_avg``.
Parameters
----------
left_ref : bool
Include left reference cols when correcting 1/f noise.
right_ref : bool
Include right reference cols when correcting 1/f noise.
avg_type : str
Type of ref col averaging to perform. Allowed values are
'pixel', 'frame', or 'int'.
mean_func : func
Function to use to calculate averages of reference columns
"""
if self.nref_l==0: left_ref = False
if self.nref_r==0: right_ref = False
if (not left_ref) and (not right_ref):
print("No left or right reference pixels to calculate 1/f noise.")
self.refs_side_avg = None
return
rl = self.refs_left if left_ref else None
rr = self.refs_right if right_ref else None
self.refs_side_avg = calc_avg_cols(rl, rr, avg_type, **kwargs)
[docs] def calc_col_smooth(self, perint=False, edge_wrap=False, savgol=False, **kwargs):
"""Optimal smoothing of side reference pixels
Geneated smoothed version of column reference values.
Uses :func:`calc_avg_cols` to determine approx 1/f noise in data
and store in ``self.refs_side_smth``.
Parameters
----------
perint : bool
Smooth side reference pixel per int, otherwise per frame.
edge_wrap : bool
Add a partial frames to the beginning and end of each averaged
time series pixels in order to get rid of edge effects.
"""
refvals = self.refs_side_avg
# Check to make sure refs_amps_avg1 and refs_amps_avg2 are valid
if refvals is None:
_log.warning('self.refs_side_avg is set to None. No smoothing applied.')
return
#raise ValueError('self.refs_side_avg set to None')
# Time to go through an entire row.
# The delta time does't seem to make any difference in the final data product
# Just for vizualization purposes...
xticks = self.detector.chsize + self.detector._line_overhead
delt = xticks / self.detector._pixel_rate
# Save smoothed values
self.refs_side_smth = calc_col_smooth(refvals, self.data.shape, \
perint=perint, edge_wrap=edge_wrap,
delt=delt, savgol=savgol, **kwargs)
[docs] def correct_col_refs(self):
"""Remove 1/f noise from data
Correct 1/f noise using the approximation stored in
``self.refs_side_smth``.
"""
# Final correction
#for i,im in enumerate(cube): im -= refvals_smoothed[i].reshape([ny,1])
nz, ny, nx = self.data.shape
self.data -= self.refs_side_smth.reshape([nz,ny,1])
[docs]def reffix_hxrg(cube, nchans=4, in_place=True, fixcol=False, **kwargs):
"""Reference pixel correction function
This function performs a reference pixel correction
on HAWAII-[1,2,4]RG detector data read out using N outputs.
Top and bottom reference pixels are used first to remove
channel offsets.
Parameters
----------
cube : ndarray
Input datacube. Can be two or three dimensions (nz,ny,nx).
in_place : bool
Perform calculations in place. Input array is overwritten.
nchans : int
Number of output amplifier channels in the detector. Default=4.
fixcol : bool
Perform reference column corrections?
Keyword Args
------------
altcol : bool
Calculate separate reference values for even/odd columns.
supermean : bool
Add back the overall mean of the reference pixels.
top_ref : bool
Include top reference rows when correcting channel offsets.
bot_ref : bool
Include bottom reference rows when correcting channel offsets.
ntop : int
Specify the number of top reference rows.
nbot : int
Specify the number of bottom reference rows.
left_ref : bool
Include left reference cols when correcting 1/f noise.
right_ref : bool
Include right reference cols when correcting 1/f noise.
nleft : int
Specify the number of left reference columns.
nright : int
Specify the number of right reference columns.
perint : bool
Smooth side reference pixel per integration, otherwise do frame-by-frame.
avg_type :str
Type of side column averaging to perform to determine ref pixel drift.
Allowed values are 'pixel', 'frame', or 'int':
* 'int' : Subtract the avg value of all side ref pixels in ramp.
* 'frame' : For each frame, get avg of side ref pixels and subtract framewise.
* 'pixel' : For each ref pixel, subtract its avg value from all frames.
savgol : bool
Using Savitsky-Golay filter method rather than FFT.
winsize : int
Size of the window filter.
order : int
Order of the polynomial used to fit the samples.
"""
# Check the number of dimensions are valid.
ndim = len(cube.shape)
if not (ndim==2 or ndim==3):
raise ValueError('Input data can only have 2 or 3 dimensions. \
Found {} dimensions.'.format(ndim))
# Convert to float
if 'float' not in cube.dtype.name:
copy = (not in_place)
cube = cube.astype(np.float, copy=copy)
if not in_place:
cube = np.copy(cube)
# Remove channel offsets
cube = reffix_amps(cube, nchans=nchans, in_place=True, **kwargs)
# Fix 1/f noise using vertical reference pixels
if fixcol:
cube = ref_filter(cube, nchans=nchans, in_place=True, **kwargs)
return cube
[docs]def reffix_amps(cube, nchans=4, in_place=True, altcol=True, supermean=False,
top_ref=True, bot_ref=True, ntop=4, nbot=4, **kwargs):
"""Correct amplifier offsets
Matches all amplifier outputs of the detector to a common level.
This routine subtracts the average of the top and bottom reference rows
for each amplifier and frame individually.
By default, reference pixel corrections are performed in place since it's
faster and consumes less memory.
Parameters
----------
cube : ndarray
Input datacube. Can be two or three dimensions (nz,ny,nx).
nchans : int
Number of output amplifier channels in the detector. Default=4.
altcol : bool
Calculate separate reference values for even/odd columns.
supermean : bool
Add back the overall mean of the reference pixels.
in_place : bool
Perform calculations in place. Input array is overwritten.
top_ref : bool
Include top reference rows when correcting channel offsets.
bot_ref : bool
Include bottom reference rows when correcting channel offsets.
ntop : int
Specify the number of top reference rows.
nbot : int
Specify the number of bottom reference rows.
"""
if not in_place:
cube = np.copy(cube)
# Check the number of dimensions are valid.
ndim = len(cube.shape)
if ndim==2:
ny,nx = cube.shape
nz = 1
cube = cube.reshape((nz,ny,nx))
elif ndim==3:
nz, ny, nx = cube.shape
else:
raise ValueError('Input data can only have 2 or 3 dimensions. \
Found {} dimensions.'.format(ndim))
chsize = int(nx / nchans)
# Number of reference rows to use
# Set nt or nb equal to 0 if we don't want to use either
nt = ntop if top_ref else 0
nb = nbot if bot_ref else 0
if (nt+nb)==0:
print("No reference pixels available for use. Returning...")
return
# Slice out reference pixels
refs_bot = cube[:,:nb,:]
refs_top = cube[:,-nt:,:]
if nt==0:
refs_all = refs_bot
elif nb==0:
refs_all = refs_top
else:
refs_all = np.hstack((refs_bot, refs_top))
assert refs_all.shape[1] == (nb+nt)
# Supermean
# the average of the average is the DC level of the output channel
smean = robust.mean(refs_all) if supermean else 0.0
# Calculate avg reference values for each frame and channel
refs_amps_avg = calc_avg_amps(refs_all, cube.shape, nchans=nchans, altcol=altcol)
for ch in range(nchans):
# Channel indices
ich1 = ch*chsize
ich2 = ich1 + chsize
# In-place subtraction of channel medians
if altcol:
for i in range(nz):
cube[i,:,ich1:ich2-1:2] -= refs_amps_avg[0][ch,i]
cube[i,:,ich1+1:ich2:2] -= refs_amps_avg[1][ch,i]
else:
for i in range(nz):
cube[i,:,ich1:ich2] -= refs_amps_avg[ch,i]
# Add back supermean
if supermean: cube += smean
cube = cube.squeeze()
return cube
[docs]def ref_filter(cube, nchans=4, in_place=True, avg_type='frame', perint=False,
edge_wrap=False, left_ref=True, right_ref=True, nleft=4, nright=4, **kwargs):
"""Optimal Smoothing
Performs an optimal filtering of the vertical reference pixel to
reduce 1/f noise (horizontal stripes).
Adapted from M. Robberto IDL code:
http://www.stsci.edu/~robberto/Main/Software/IDL4pipeline/
Parameters
----------
cube : ndarray
Input datacube. Can be two or three dimensions (nz,ny,nx).
nchans : int
Number of output amplifier channels in the detector. Default=4.
in_place : bool
Perform calculations in place. Input array is overwritten.
perint : bool
Smooth side reference pixel per integration,
otherwise do frame-by-frame.
avg_type : str
Type of ref col averaging to perform. Allowed values are
'pixel', 'frame', or 'int'.
left_ref : bool
Include left reference cols when correcting 1/f noise.
right_ref : bool
Include right reference cols when correcting 1/f noise.
nleft : int
Specify the number of left reference columns.
nright : int
Specify the number of right reference columns.
Keyword Arguments
=================
savgol : bool
Using Savitsky-Golay filter method rather than FFT.
winsize : int
Size of the window filter.
order : int
Order of the polynomial used to fit the samples.
mean_func : func
Function to use to calculate averages of reference columns.
"""
if not in_place:
cube = np.copy(cube)
# Check the number of dimensions are valid.
ndim = len(cube.shape)
if ndim==2:
ny,nx = cube.shape
nz = 1
cube = cube.reshape((nz,ny,nx))
elif ndim==3:
nz, ny, nx = cube.shape
else:
raise ValueError('Input data can only have 2 or 3 dimensions. \
Found {} dimensions.'.format(ndim))
# Number of reference rows to use
# Set nt or nb equal to 0 if we don't want to use either
nl = nleft if left_ref else 0
nr = nright if right_ref else 0
assert nl>=0, 'Number of left reference pixels must not be negative.'
assert nr>=0, 'Number of right reference pixels must not be negative.'
if (nl+nr)==0:
print("No reference pixels available for use. Returning...")
return
# Slice out reference pixel columns
refs_left = cube[:,:,:nl] if nl>0 else None
refs_right = cube[:,:,-nr:] if nr>0 else None
refvals = calc_avg_cols(refs_left, refs_right, avg_type, **kwargs)
# The delta time does't seem to make any difference in the final data product
# Just for vizualization purposes...
delt = 10E-6 * (nx/nchans + 12.)
refvals_smoothed = calc_col_smooth(refvals, cube.shape, perint=perint,
edge_wrap=edge_wrap, delt=delt, **kwargs)
# Final correction
#for i,im in enumerate(cube): im -= refvals_smoothed[i].reshape([ny,1])
cube -= refvals_smoothed.reshape([nz,ny,1])
cube = cube.squeeze()
return cube
[docs]def calc_avg_amps(refs_all, data_shape, nchans=4, altcol=True):
"""Calculate amplifier averages
Save the average reference value for each amplifier in each frame.
Assume by default that alternating columns are offset from each other,
so we save two arrays: self.refs_amps_avg1 and self.refs_amps_avg2.
Each array has a size of (namp, ngroup).
Parameters
----------
refs_all : ndarray
The top and/or bottom references pixels order
in a shape (nz, nref_rows, nx)
data_shape : tuple
Shape of the data array: (nz, ny, nx).
nchans : int
Number of amplifier output channels.
altcol : bool
Calculate separate reference values for even/odd columns?
Default=True.
"""
nz, ny, nx = data_shape
chsize = int(nx / nchans)
if altcol:
refs_amps_avg1 = []
refs_amps_avg2 = []
for ch in range(nchans):
# Channel indices
ich1 = ch*chsize
ich2 = ich1 + chsize
# Slice out alternating columns
refs_ch1 = refs_all[:,:,ich1:ich2-1:2].reshape((nz,-1))
refs_ch2 = refs_all[:,:,ich1+1:ich2:2].reshape((nz,-1))
# Take the resistant mean
chavg1 = robust.mean(refs_ch1,axis=1)
chavg2 = robust.mean(refs_ch2,axis=1)
refs_amps_avg1.append(chavg1)
refs_amps_avg2.append(chavg2)
return (np.array(refs_amps_avg1), np.array(refs_amps_avg2))
else:
refs_amps_avg = []
for ch in range(nchans):
# Channel indices
ich1 = ch*chsize
ich2 = ich1 + chsize
# Slice out alternating columns
refs_ch = refs_all[:,:,ich1:ich2].reshape((nz,-1))
# Take the resistant mean and reshape for broadcasting
chavg = robust.mean(refs_ch,axis=1).reshape(-1,1,1)
refs_amps_avg.append(chavg)
return np.array(refs_amps_avg)
[docs]def calc_avg_cols(refs_left=None, refs_right=None, avg_type='frame',
mean_func=np.median, **kwargs):
"""Calculate average of column references
Determine the average values for the column references, which
is subsequently used to estimate the 1/f noise contribution.
Parameters
----------
refs_left : ndarray
Left reference columns.
refs_right : ndarray
Right reference columns.
avg_type : str
Type of ref column averaging to perform to determine ref pixel variation.
Allowed values are 'pixel', 'frame', or 'int'.
'pixel' : For each ref pixel, subtract its avg value from all frames.
'frame' : For each frame, get avg ref pixel values and subtract framewise.
'int' : Calculate avg of all ref pixels within the ramp and subtract.
mean_func : func
Function to use to calculate averages of reference columns
"""
# Which function to use for calculating averages?
# mean_func = robust.mean
# mean_func = np.median
# In this context, nl and nr are either 0 (False) or 1 (True)
nl = 0 if refs_left is None else 1
nr = 0 if refs_right is None else 1
# Left and right reference pixels
# Make a copy so as to not modify the original data?
if nl>0: refs_left = np.copy(refs_left)
if nr>0: refs_right = np.copy(refs_right)
# Set the average of left and right reference pixels to zero
# By default, pixel averaging is best for large groups
if avg_type is None:
avg_type = 'frame'
if refs_left is not None:
nz, ny, nchan = refs_left.shape
else:
nz, ny, nchan = refs_right.shape
# If there is only 1 frame, then we have to do "per frame" averaging.
# Set to "per int", which produces the same result as "per frame" for nz=1.
if nz==1:
avg_type = 'int'
# Remove average ref pixel values
# Average over entire integration
if 'int' in avg_type:
if nl>0: refs_left -= mean_func(refs_left)
if nr>0: refs_right -= mean_func(refs_right)
# Average over each frame
elif 'frame' in avg_type:
if nl>0: refs_left_mean = mean_func(refs_left.reshape((nz,-1)), axis=1)
if nr>0: refs_right_mean = mean_func(refs_right.reshape((nz,-1)), axis=1)
# Subtract estimate of each ref pixel "intrinsic" value
for i in range(nz):
if nl>0: refs_left[i] -= refs_left_mean[i]
if nr>0: refs_right[i] -= refs_right_mean[i]
# Take the average of each reference pixel
elif 'pix' in avg_type:
if nl>0: refs_left_mean = mean_func(refs_left, axis=0)
if nr>0: refs_right_mean = mean_func(refs_right, axis=0)
# Subtract estimate of each ref pixel "intrinsic" value
for i in range(nz):
if nl>0: refs_left[i] -= refs_left_mean
if nr>0: refs_right[i] -= refs_right_mean
if nl==0:
refs_side_avg = refs_right.mean(axis=2)
elif nr==0:
refs_side_avg = refs_left.mean(axis=2)
else:
refs_side_avg = (refs_right.mean(axis=2) + refs_left.mean(axis=2)) / 2
return refs_side_avg
[docs]def calc_col_smooth(refvals, data_shape, perint=False, edge_wrap=False,
delt=5.24E-4, savgol=False, winsize=31, order=3, **kwargs):
"""Perform optimal smoothing of side ref pix
Generates smoothed version of column reference values.
Smooths values from calc_avg_cols() via FFT.
Parameters
----------
refvals : ndarray
Averaged column reference pixels
data_shape : tuple
Shape of original data (nz,ny,nx)
Keyword Arguments
=================
perint : bool
Smooth side reference pixel per int, otherwise per frame.
edge_wrap : bool
Add a partial frames to the beginning and end of each averaged
time series pixels in order to get rid of edge effects.
delt : float
Time between reference pixel samples.
savgol : bool
Using Savitsky-Golay filter method rather than FFT.
winsize : int
Size of the window filter.
order : int
Order of the polynomial used to fit the samples.
"""
nz,ny,nx = data_shape
# May want to revisit the do-all-at-once or break-up decision
# This may depend on preamp reset per frame or per integration
# For now, we'll do it frame-by-frame by default (perint=False)
if perint: # per integration
if edge_wrap: # Wrap around to avoid edge effects
refvals2 = np.vstack((refvals[0][::-1], refvals, refvals[-1][::-1]))
if savgol: # SavGol filter
refvals_smoothed2 = savgol_filter(refvals2.ravel(), winsize, order, delta=1)
else: # Or "optimal" smoothing algorithm
refvals_smoothed2 = smooth_fft(refvals2, delt)
refvals_smoothed = refvals_smoothed2[ny:-ny].reshape(refvals.shape)
else:
if savgol:
refvals_smoothed = savgol_filter(refvals.ravel(), winsize, order, delta=1)
else:
refvals_smoothed = smooth_fft(refvals, delt)
refvals_smoothed = refvals_smoothed.reshape(refvals.shape)
else:
refvals_smoothed = []
if edge_wrap: # Wrap around to avoid edge effects
for ref in refvals:
ref2 = np.concatenate((ref[:ny//2][::-1], ref, ref[ny//2:][::-1]))
if savgol:
ref_smth = savgol_filter(ref2, winsize, order, delta=1)
else:
ref_smth = smooth_fft(ref2, delt)
refvals_smoothed.append(ref_smth[ny//2:ny//2+ny])
refvals_smoothed = np.array(refvals_smoothed)
else:
for ref in refvals:
if savgol:
ref_smth = savgol_filter(ref, winsize, order, delta=1)
else:
ref_smth = smooth_fft(ref, delt)
refvals_smoothed.append(ref_smth)
refvals_smoothed = np.array(refvals_smoothed)
return refvals_smoothed
[docs]def smooth_fft(data, delt, first_deriv=False, second_deriv=False):
"""Optimal smoothing algorithm
Smoothing algorithm to perform optimal filtering of the
vertical reference pixel to reduce 1/f noise (horizontal stripes),
based on the Kosarev & Pantos algorithm. This assumes that the
data to be filtered/smoothed has been sampled evenly.
If first_deriv is set, then returns two results
if second_deriv is set, then returns three results.
Adapted from M. Robberto IDL code:
http://www.stsci.edu/~robberto/Main/Software/IDL4pipeline/
Parameters
----------
data : ndarray
Signal to be filtered.
delt : float
Delta time between samples.
first_deriv : bool
Return the first derivative.
second_deriv : bool
Return the second derivative (along with first).
"""
Dat = data.flatten()
N = Dat.size
Pi2 = 2*np.pi
OMEGA = Pi2 / (N*delt)
X = np.arange(N) * delt
##------------------------------------------------
## Center and Baselinefit of the data
##------------------------------------------------
Dat_m = Dat - np.mean(Dat)
SLOPE = (Dat_m[-1] - Dat_m[0]) / (N-2)
Dat_b = Dat_m - Dat_m[0] - SLOPE * X / delt
##------------------------------------------------
## Compute fft- / power- spectrum
##------------------------------------------------
Dat_F = np.fft.rfft(Dat_b) #/ N
Dat_P = np.abs(Dat_F)**2
# Frequency for abscissa axis
# F = [0.0, 1.0/(N*delt), ... , 1.0/(2.0*delt)]:
#F = np.arange(N/2+1) / (N*delt)
#F = np.fft.fftfreq(Dat_F.size, delt)
##------------------------------------------------
## Noise spectrum from 'half' to 'full'
## Mind: half means N/4, full means N/2
##------------------------------------------------
i1 = int((N-1) / 4)
i2 = int((N-1) / 2) + 1
Sigma = np.sum(Dat_P[i1:i2])
Noise = Sigma / ((N-1)/2 - (N-1)/4)
##------------------------------------------------
## Get Filtercoeff. according to Kosarev/Pantos
## Find the J0, start search at i=1 (i=0 is the mean)
##------------------------------------------------
J0 = 2
for i in np.arange(1, int(N/4)+1):
sig0, sig1, sig2, sig3 = Dat_P[i:i+4]
if (sig0<Noise) and ((sig1<Noise) or (sig2<Noise) or (sig3<Noise)):
J0 = i
break
##------------------------------------------------
## Compute straight line extrapolation to log(Dat_P)
##------------------------------------------------
ii = np.arange(1,J0+1)
logvals = np.log(Dat_P[1:J0+1])
XY = np.sum(ii * logvals)
XX = np.sum(ii**2)
S = np.sum(logvals)
# Find parameters A1, B1
XM = (2. + J0) / 2
YM = S / J0
A1 = (XY - J0*XM*YM) / (XX - J0*XM*XM)
B1 = YM - A1 * XM
# Compute J1, the frequency for which straight
# line extrapolation drops 20dB below noise
J1 = int(np.ceil((np.log(0.01*Noise) - B1) / A1 ))
if J1<J0:
J1 = J0+1
##------------------------------------------------
## Compute the Kosarev-Pantos filter windows
## Frequency-ranges: 0 -- J0 | J0+1 -- J1 | J1+1 -- N2
##------------------------------------------------
nvals = int((N-1)/2 + 1)
LOPT = np.zeros_like(Dat_P)
LOPT[0:J0+1] = Dat_P[0:J0+1] / (Dat_P[0:J0+1] + Noise)
i_arr = np.arange(J1-J0) + J0+1
LOPT[J0+1:J1+1] = np.exp(A1*i_arr+B1) / (np.exp(A1*i_arr+B1) + Noise)
##--------------------------------------------------------------------
## De-noise the Spectrum with the filter
## Calculate the first and second derivative (i.e. multiply by iW)
##--------------------------------------------------------------------
# first loop gives smoothed data
# second loop produces first derivative
# third loop produces second derivative
if second_deriv:
ndiff = 3
elif first_deriv:
ndiff = 2
else:
ndiff = 1
for diff in range(ndiff):
#Fltr_Spectrum = np.zeros(N,dtype=np.complex)
Fltr_Spectrum = np.zeros_like(Dat_P,dtype=np.complex)
# make the filter complex
i1 = 1; n2 = int((N-1)/2)+1; i2 = i1+n2
FltrCoef = LOPT[i1:].astype(np.complex)
# differentitation in frequency domain
iW = ((np.arange(n2)+i1)*OMEGA*1j)**diff
# multiply spectrum with filter coefficient
Fltr_Spectrum[i1:] = Dat_F[i1:] * FltrCoef * iW
# Fltr_Spectrum[0] values
# The derivatives of Fltr_Spectrum[0] are 0
# Mean if diff = 0
Fltr_Spectrum[0] = 0 if diff>0 else Dat_F[0]
# Inverse fourier transform back in time domain
Dat_T = np.fft.irfft(Fltr_Spectrum)
#Dat_T[-1] = np.real(Dat_T[0]) + 1j*np.imag(Dat_T[-1])
# This ist the smoothed time series (baseline added)
if diff==0:
Smoothed_Data = np.real(Dat_T) + Dat[0] + SLOPE * X / delt
elif diff==1:
First_Diff = np.real(Dat_T) + SLOPE / delt
elif diff==2:
Secnd_Diff = np.real(Dat_T)
if second_deriv:
return Smoothed_Data, First_Diff, Secnd_Diff
elif first_deriv:
return Smoothed_Data, First_Diff
else:
return Smoothed_Data
[docs]def chrem_med(imarr, nchans=4, yind=None, bpmask=None, in_place=True,
mean_func=np.median):
""" Subtract Amplifier Channel Offsets
Sometimes amplifiers have offsets relative to each other
due to imperfect tracking of reference pixels. This function
determines the average offset from zero of each channel
and subtracts the mean/median from the entire channel.
Parameters
----------
imarr : ndarray
Array of image (or single image).
nchans : int
Number of amplifier readout channels.
yind : array-like
Two element array to select a y-range for calculating
the channel offset.
bpmask : bool array
Bad pixel mask (1 for bad, 0 for good). Can either
be a single image or image cube of same size as `imarr`.
in_place : bool
Correct in-place? If False, returns a copy of the array
with channels offset.
mean_func : func
Function to use for performing the mean calculation.
"""
sh_orig = imarr.shape
if len(sh_orig)==2:
nz = 1
ny, nx = sh_orig
imarr = imarr.reshape([nz,ny,nx])
else:
nz, ny, nx = sh_orig
chsize = int(nx / nchans)
# Make copy of array?
arr_out = imarr if in_place else imarr.copy()
# Define y index start/stop locations
yind = np.array([0,ny]) if yind is None else yind
bpmask = np.zeros([ny,nx]) if bpmask is None else bpmask
bpmask = bpmask.squeeze()
# Cropped array
for ch in np.arange(nchans):
# Get channel x-indices
x1 = int(ch*chsize)
x2 = int(x1 + chsize)
# Select the channel and y-range
imch = arr_out[:, yind[0]:yind[1], x1:x2]
# imch_ind = imch.reshape([imch.shape[0],-1])
# Take median of all pixels in channel for each image
if len(bpmask.shape)==2:
bpmask_ch = bpmask[yind[0]:yind[1], x1:x2]
igood = (bpmask_ch == 0)
chmed = mean_func(imch[:,igood], axis=1)
# Subtract median channel from each image
arr_out[:,:,x1:x2] -= chmed.reshape([-1,1,1])
else:
for jj, im in enumerate(imch):
bpmask_ch = bpmask[jj, yind[0]:yind[1], x1:x2]
igood = (bpmask_ch == 0)
arr_out[jj,:,x1:x2] -= mean_func(im[igood])
return arr_out.reshape(sh_orig)
[docs]def channel_averaging(im, nchans=4, same_scan_direction=False, off_chans=True,
mn_func=np.nanmedian, **kwargs):
"""Estimate common 1/f noise in image
For a given image, average the channels together to find
the common pattern noise present within the channels.
Returns an array the same size as the input image.
Parameters
==========
im : ndarray
Input image
Keyword Args
============
nchans : int
Number of output channels
same_scan_direction : bool
Are all the output channels read in the same direction?
By default fast-scan readout direction is ``[-->,<--,-->,<--]``
If ``same_scan_direction``, then all ``-->``
off_chans : bool
Calculate indepenent values for each channel using the off channels.
mn_func : function
What function should we use to calculate the average.
Default `np.nanmedian`
"""
ny, nx = im.shape
chsize = int(nx / nchans)
# Reshape to [ny,chsize,nchans]
im = im.reshape(ny,nchans,chsize).transpose([0,2,1])
# Flip channels if they're reversed
if same_scan_direction==False:
# Make sure we don't modify the input array
im = im.copy()
for ch in range(nchans):
if np.mod(ch,2)==1:
im[:,:,ch] = im[:,::-1,ch]
if off_chans == False:
im = im.reshape([-1,nchans])
ch_mn = mn_func(im, axis=1).reshape([ny,chsize])
im = im.reshape([ny,chsize,nchans])
arr_list = []
ind_chans = np.arange(nchans)
for ch in range(nchans):
# Take median of other channels
if off_chans:
ind_off = np.where(ind_chans != ch)[0]
im_off_chans = im[:,:,ind_off].reshape([-1,nchans-1])
ch_mn = mn_func(im_off_chans, axis=1).reshape([ny,chsize])
# Consecutive outputs reversed?
if (np.mod(ch,2) == 0) or (same_scan_direction == True):
arr_list.append(ch_mn)
else:
arr_list.append(ch_mn[:,::-1])
# im = im.reshape([ny,chsize,nchans]).transpose([0,2,1]).reshape([ny,nx])
# Flip channels back to original position
if same_scan_direction==False:
for ch in range(nchans):
if np.mod(ch,2)==1:
im[:,:,ch] = im[:,::-1,ch]
return np.concatenate(arr_list, axis=1)
[docs]def channel_smooth_fft(im_arr, winsize=64):
"""Channel smoothing using smooth_fft
Function for generating a map of the 1/f noise within a series of input images.
The input images should show some clear noise structure for this to be useful.
Uses M. Robberto smoothing algorithm.
One might prefer the `channel_smooth_savgol` or `channel_smooth_butter`
functions due to their quickness.
Parameters
==========
im_arr : ndarray
Input array of images
winsize : int
Window size chunks to break up
"""
sh = im_arr.shape
if len(sh)==2:
nz = 1
ny, chsize = sh
else:
nz, ny, chsize = sh
# Check that winsize is even
winsize = winsize+1 if winsize % 2 == 1 else winsize
# Reshape in case of nz=1
im_arr = im_arr.reshape([nz, -1])
res_arr = []
excess = winsize #int(winsize / 2)
for im in im_arr:
nwin = int(im.size / winsize) + 1
# Add some extra values to beginning and end to remove edge effects
im2 = np.concatenate((im[:excess][::-1], im, im[-excess:][::-1]))
res = []
for i in range(nwin):
i1 = 0 if i==0 else winsize*i
i2 = i1 + winsize + 2*excess
vals = im2[i1:i2]
# If smooth_fft fails, then just take median
# Failing generally means the distribution is consistent with white noise
try:
vals_smooth = smooth_fft(vals, 10e-6)
# Trim edges
vals_smooth = vals_smooth[excess:excess+winsize]
except:
vals_smooth = np.zeros(winsize) + np.nanmedian(vals)
res.append(vals_smooth)
res = np.array(res).ravel()[0:im.size]
res_arr.append(res.reshape([ny,-1]))
return np.array(res_arr).squeeze()
[docs]def mask_helper():
"""Helper to handle indices and logical indices of a mask
Output: index, a function, with signature indices = index(logical_indices),
to convert logical indices of a mask to 'equivalent' indices
Example:
>>> # linear interpolation of NaNs
>>> mask = np.isnan(y)
>>> x = mask_helper(y)
>>> y[mask]= np.interp(x(mask), x(~mask), y[~mask])
"""
return lambda z: np.nonzero(z)[0]
[docs]def channel_smooth_savgol(im_arr, winsize=31, order=3, per_line=False,
mask=None, **kwargs):
"""Channel smoothing using savgol filter
Parameters
==========
im_arr : ndarray
Input array of images (intended to be a cube of output channels).
Shape should either be (ny, chsize) to smooth a single channel or
(nchan, ny, chsize) for multiple channels.
Each image is operated on separately. If only two dimensions,
then only a single input image is assumed. NaN's will be
interpolated over.
Keyword Args
============
winsize : int
Size of the window filter. Should be an odd number.
order : int
Order of the polynomial used to fit the samples.
per_line : bool
Smooth each channel line separately with the hopes of avoiding
edge discontinuities.
mask : bool image or None
An image mask of pixels to ignore. Should be same size as im_arr.
This can be used to mask pixels that the filter should ignore,
such as stellar sources or pixel outliers. A value of True indicates
that pixel should be ignored.
mode : str
Must be 'mirror', 'constant', 'nearest', 'wrap' or 'interp'. This
determines the type of extension to use for the padded signal to
which the filter is applied. When `mode` is 'constant', the padding
value is given by `cval`.
When the 'interp' mode is selected (the default), no extension
is used. Instead, a degree `polyorder` polynomial is fit to the
last `window_length` values of the edges, and this polynomial is
used to evaluate the last `window_length // 2` output values.
cval : float
Value to fill past the edges of the input if `mode` is 'constant'.
Default is 0.0.
"""
sh = im_arr.shape
if len(sh)==2:
nz = 1
ny, chsize = sh
else:
nz, ny, chsize = sh
# Check that winsize is odd
winsize = winsize-1 if winsize % 2 == 0 else winsize
# Reshape in case of nz=1
im_arr = im_arr.reshape([nz, -1])
if mask is not None:
mask = mask.reshape([nz, -1])
res_arr = []
for i, im in enumerate(im_arr):
# im should be a 1D array
# Interpolate over masked data and NaN's
nans = np.isnan(im)
im_mask = nans if mask is None else nans | mask[i].flatten()
if im_mask.any():
# Create a copy so as to not change the original data
im = np.copy(im)
# Use a savgol filter to smooth out any outliers
res = im.copy()
res[~im_mask] = savgol_filter(im[~im_mask], 31, 3, mode='interp')
# Replace masked pixels with linear interpolation
x = mask_helper() # Returns the nonzero (True) indices of a mask
im[im_mask]= np.interp(x(im_mask), x(~im_mask), res[~im_mask])
if per_line:
im = im.reshape([ny,-1])
res = savgol_filter(im, winsize, order, axis=1, delta=1, **kwargs)
res_arr.append(res)
else:
res = savgol_filter(im, winsize, order, delta=1, **kwargs)
res_arr.append(res.reshape([ny,-1]))
return np.array(res_arr).squeeze()
[docs]def channel_smooth_butter(im_arr, order=3, freq=0.1, per_line=False, mask=None):
"""Channel smoothing using Butterworth filter
Parameters
==========
im_arr : ndarray
Input array of images (intended to be a cube of output channels).
Each image is operated on separately. If only two dimensions,
then only a single input image is assumed.
Keyword Args
============
order : int
Order of the filter (high order have sharper frequency cut-off)
freq : float
Normalized frequency cut-off (between 0 and 1). 1 is Nyquist.
per_line : bool
Smooth each channel line separately with the hopes of avoiding
edge discontinuities.
mask : bool image or None
An image mask of pixels to ignore. Should be same size as im_arr.
This can be used to mask pixels that the filter should ignore,
such as stellar sources or pixel outliers.
"""
from scipy.signal import butter, filtfilt
sh = im_arr.shape
if len(sh)==2:
nz = 1
ny, chsize = sh
else:
nz, ny, chsize = sh
# Reshape in case of nz=1
im_arr = im_arr.reshape([nz, -1])
if mask is not None:
mask = mask.reshape([nz, -1])
res_arr = []
b, a = butter(order, freq, btype='lowpass', analog=False)
for i, im in enumerate(im_arr):
# im should be a 1D array
# Interpolate over masked data and NaN's
# Replace masked pixels with linear interpolation
nans = np.isnan(im)
im_mask = nans if mask is None else nans | mask[i].flatten()
if im_mask.any():
# Create a copy so as to not change the original data
im = np.copy(im)
# Use a savgol filter to smooth out any outliers
res = im.copy()
res[~im_mask] = savgol_filter(im[~im_mask], 31, 3, mode='interp')
# Replace masked pixels with linear interpolation
x = mask_helper() # Returns the nonzero (True) indices of a mask
im[im_mask]= np.interp(x(im_mask), x(~im_mask), res[~im_mask])
# Do filter line-by-line
if per_line:
im = im.reshape([ny,-1])
res_lines = []
for line in im:
res = filtfilt(b, a, line)
res_lines.append(res)
res_lines = np.array(res_lines)
res_arr.append(res_lines.reshape([ny,-1]))
else:
res = filtfilt(b, a, im)
res_arr.append(res.reshape([ny,-1]))
return np.array(res_arr).squeeze()