Source code for pynrc.reduce.calib

import numpy as np
import matplotlib
import matplotlib.pyplot as plt


# Import libraries
import numpy as np
import os, gzip, json
from copy import deepcopy
from scipy import ndimage

from astropy.io import fits
from astropy.modeling import models, fitting

# Multiprocessing
import multiprocessing as mp

# Program bar
from tqdm.auto import trange, tqdm

from webbpsf_ext import robust
from webbpsf_ext.image_manip import fshift, pad_or_cut_to_size
from webbpsf_ext.maths import hist_indices, jl_poly_fit, jl_poly

# import pynrc
from ..nrc_utils import var_ex_model
from ..reduce.ref_pixels import reffix_hxrg, channel_smooth_savgol, channel_averaging

from .. import conf, DetectorOps
from ..detops import create_detops

from ..logging_utils import setup_logging
import logging

from pynrc import logging_utils
_log = logging.getLogger('pynrc')

[docs]class nircam_dark(object):
[docs] def __init__(self, scaid, datadir, outdir, lindir=None, DMS=False, same_scan_direction=False, reverse_scan_direction=False): self.DMS = DMS self.scaid = scaid # Directory information self._create_dir_structure(datadir, outdir, lindir=lindir) # Get header information and create a NIRCam detector timing instance hdr = self._grab_single_header() self.det = create_detops(hdr, DMS=DMS) self.det.same_scan_direction = same_scan_direction self.det.reverse_scan_direction = reverse_scan_direction # Get temperature information self._grab_temperature_data() self._init_attributes()
def _init_attributes(self): # Create masks for ref pixels, active pixels, and channels self._create_pixel_masks() # Initialize superbias and superdark attributes self._super_bias = None self._super_bias_sig = None self._super_dark = None self._super_dark_sig = None self._super_dark_ramp = None self._super_dark_deconv = None self._super_bias_deconv = None self._dark_ramp_dict = None self._pixel_masks = None # IPC info self._kernel_ipc = None self._kernel_ppc = None self._kernel_ipc_sig = None self._kernel_ppc_sig = None # Noise info self._ktc_noise = None self._cds_act_dict = None self._cds_ref_dict = None self._eff_noise_dict = None self._pow_spec_dict = None # Reference pixel properties self._ref_pixel_dict = None # Column variations self._column_variations = None self._column_prob_bad = None # Non-linearity coefficients self.linear_dict = None self.nonlinear_dict = None # Flat field info self.lflats = None # Low frequency spatial variations self.pflats = None # High frequency variations (cross hatch) # Directory and files @property def datadir(self): return self.paths_dict['datadir'] @property def lindir(self): return self.paths_dict['lindir'] @property def outdir(self): return self.paths_dict['outdir'] @property def allfiles(self): return self.paths_dict['allfiles'] @property def linfiles(self): return self.paths_dict['linfiles'] # Temperature information @property def temperature_dict(self): return self._temperature_dict @property def time_arr(self): return self.det.times_group_avg # Ramp shapes and sizes @property def dark_shape(self): """Shape of dark ramps""" nx = self.det.xpix ny = self.det.ypix nz = self.det.multiaccum.ngroup return (nz,ny,nx) @property def nchan(self): """Number of output channels""" return self.det.nout @property def nchans(self): """Number of output channels""" return self.det.nout @property def chsize(self): """Width of output channel""" return self.det.chsize # Array masks @property def mask_ref(self): return self._mask_ref @property def mask_act(self): if self.mask_ref is None: return None else: return ~self.mask_ref @property def mask_channels(self): return self._mask_channels # Bias and dark slope information @property def super_bias(self): return self._super_bias @property def super_bias_deconv(self): return self._super_bias_deconv @property def super_dark(self): return self._super_dark @property def super_dark_deconv(self): return self._super_dark_deconv @property def super_dark_ramp(self): return self._super_dark_ramp @property def dark_ramp_dict(self): return self._dark_ramp_dict # Column variations @property def ref_pixel_dict(self): return self._ref_pixel_dict # Column variations @property def column_variations(self): return self._column_variations @property def column_prob_bad(self): return self._column_prob_bad # IPC/PPC Kernel info @property def kernel_ipc(self): return self._kernel_ipc @property def ipc_alpha_frac(self): """Fractional IPC value (alpha)""" if self.kernel_ipc is None: return None else: return self.kernel_ipc[1,2] @property def kernel_ppc(self): return self._kernel_ppc @property def ppc_frac(self): """Fractional PPC value""" if self.kernel_ppc is None: return None else: return self.kernel_ppc[1,2] @property def ktc_noise(self): return self._ktc_noise @property def cds_act_dict(self): return self._cds_act_dict @property def cds_ref_dict(self): return self._cds_ref_dict @property def eff_noise_dict(self): return self._eff_noise_dict @property def pow_spec_dict(self): return self._pow_spec_dict def _create_dir_structure(self, datadir, outdir, lindir=None): """ Directories and files""" scaid = self.scaid # Add SCA ID to output directory path outbase = outdir if str(scaid) in outbase: outdir = outbase else: outdir = os.path.join(outbase, str(scaid)) + '/' # Directory information if datadir is None: allfiles = None else: indir = os.path.join(datadir, str(scaid)) + '/' # Get file names within directory allfits = [file for file in os.listdir(indir) if file.endswith('.fits')] allfits = np.sort(allfits) # Add directory allfiles = [indir + f for f in allfits] if lindir is None: linfiles = None else: # Directory information indir = os.path.join(lindir, str(scaid)) + '/' # Get file names within directory linfits = [file for file in os.listdir(indir) if file.endswith('.fits')] linfits = np.sort(linfits) # Add directory linfiles = [indir + f for f in linfits] # Directory to save figures for analysis figdir = os.path.join(outdir, 'FIGURES') + '/' # figdir = os.path.join(outdir, str(scaid)) + '/' # Directories to save super bias and super dark info super_bias_dir = os.path.join(outdir, 'SUPER_BIAS') + '/' super_dark_dir = os.path.join(outdir, 'SUPER_DARK') + '/' noise_dir = os.path.join(outdir, 'NOISE') + '/' power_spec_dir = os.path.join(outdir, 'POWER_SPEC') + '/' linearity_dir = os.path.join(outdir, 'LINEARITY') + '/' # Make sure directories exist for writing for path in [outbase, outdir, figdir, super_bias_dir, super_dark_dir, noise_dir, linearity_dir]: if not os.path.exists(path): os.mkdir(path) self.paths_dict = { 'datadir ' : datadir, 'allfiles' : allfiles, 'linfiles' : linfiles, 'outdir' : outdir, 'figdir' : figdir, 'header_file' : outdir + f'HEADER_{scaid}.TXT', 'temperatures_file' : outdir + f'TEMPERATURES_{scaid}.JSON', 'super_bias_dir' : super_bias_dir, 'super_dark_dir' : super_dark_dir, 'super_bias_init' : super_bias_dir + f'SUPER_BIAS_INIT_{scaid}.FITS', 'super_bias' : super_bias_dir + f'SUPER_BIAS_{scaid}.FITS', 'super_dark_ramp' : super_dark_dir + f'SUPER_DARK_RAMP_{scaid}.FITS', 'super_dark' : super_dark_dir + f'SUPER_DARK_{scaid}.FITS', 'super_dark_ramp_avgs': super_dark_dir + f'SUPER_DARK_RAMP_AVGS_{scaid}.npz', 'kernel_ipc' : super_dark_dir + f'KERNEL_IPC_{scaid}.FITS', 'kernel_ppc' : super_dark_dir + f'KERNEL_PPC_{scaid}.FITS', 'pixel_masks' : super_dark_dir + f'PIXEL_MASKS_{scaid}.FITS.gz', 'column_variations' : super_dark_dir + f'SUPER_DARK_COLVAR_{scaid}.FITS', 'ref_pix_variations' : super_bias_dir + f'BIAS_BEHAVIOR_{scaid}.JSON', 'cds_act_dict' : noise_dir + f'CDS_NOISE_ACTIVE_{scaid}.JSON', 'cds_ref_dict' : noise_dir + f'CDS_NOISE_REF_{scaid}.JSON', 'eff_noise_dict' : noise_dir + f'EFF_NOISE_{scaid}.JSON', 'power_spec_cds' : noise_dir + f'POWER_SPEC_CDS_{scaid}.npy', 'power_spec_full' : noise_dir + f'POWER_SPEC_FULL_{scaid}.npy', 'power_spec_cds_oh' : noise_dir + f'POWER_SPEC_CDS_OH_{scaid}.npy', 'power_spec_full_oh' : noise_dir + f'POWER_SPEC_FULL_OH_{scaid}.npy', 'power_spec_cds_pix' : noise_dir + f'POWER_SPEC_CDS_PIX_{scaid}.npy', 'power_spec_full_pix' : noise_dir + f'POWER_SPEC_FULL_PIX_{scaid}.npy', 'linear_coeffs' : linearity_dir + f'LINEAR_COEFFS_{scaid}.npz', 'nonlinear_coeffs' : linearity_dir + f'NONLINEAR_COEFFS_{scaid}.npz', 'super_flats' : linearity_dir + f'SUPER_FLATS_{scaid}.FITS', } def _create_pixel_masks(self): # Array masks # self.mask_act is just ~self.mask_ref self._mask_ref = self.det.mask_ref self._mask_channels = self.det.mask_channels def _dict_to_json(self, in_dict, savename): # Save reference pixel dictionary dtemp = deepcopy(in_dict) # Convert any ndarrays to lists for k in dtemp.keys(): if isinstance(dtemp[k], (np.ndarray)): dtemp[k] = dtemp[k].tolist() with open(savename, 'w') as fp: json.dump(dtemp, fp, sort_keys=False, indent=4) def _json_to_dict(self, savename): # Load from JSON files with open(savename, 'r') as fp: d = json.load(fp) # Convert any lists to np.array for k in d.keys(): if isinstance(d[k], (list)): d[k] = np.array(d[k]) return d def _grab_single_header(self): """Read/save or Open header of first FITS file""" from astropy.io.fits import Header savename = self.paths_dict['header_file'] file_exists = os.path.isfile(savename) if file_exists: hdr = Header.fromtextfile(savename) else: hdr = fits.getheader(self.allfiles[0]) hdr.totextfile(savename, overwrite=True) return hdr def _grab_temperature_data(self): """ Grab temperature data from headers Creates a dictionary that houses the temperature info stored in the headers of each FITS file. """ # TODO: Add DMS support for temperature if self.DMS: self._temperature_dict = None _log.error("DMS data not yet supported obtaining FPA temperatures") return savename = self.paths_dict['temperatures_file'] file_exists = os.path.isfile(savename) if file_exists: # Load from JSON files temperature_dict = self._json_to_dict(savename) else: # Get initial temperature keys hdr = self._grab_single_header() tkeys = [k for k in list(hdr.keys()) if k[0:2]=='T_'] + ['ASICTEMP'] # Initialize lists for each temperature key temperature_dict = {} for k in tkeys: temperature_dict[k] = [] for f in self.allfiles: hdul = fits.open(f) hdr = hdul[0].header for k in tkeys: temperature_dict[k].append(float(hdr[k])) hdul.close() # Save temperature dictionary self._dict_to_json(temperature_dict, savename) self._temperature_dict = temperature_dict def get_super_bias_init(self, deg=1, nsplit=2, force=False, **kwargs): _log.info("Generating initial super bias") allfiles = self.allfiles savename = self.paths_dict['super_bias_init'] file_exists = os.path.isfile(savename) if file_exists and (not force): super_bias, super_bias_sig = get_fits_data(savename) else: # Default ref pixel correction kw args kwargs_def = { 'nchans': self.nchan, 'altcol': True, 'in_place': True, 'fixcol': True, 'avg_type': 'pixel', 'savgol': True, 'perint': False } for k in kwargs_def.keys(): if k not in kwargs: kwargs[k] = kwargs_def[k] res = gen_super_bias(allfiles, deg=deg, nsplit=nsplit, DMS=self.DMS, return_std=True, **kwargs) super_bias, super_bias_sig = res # Save superbias frame to directory hdu = fits.PrimaryHDU(np.array([super_bias, super_bias_sig])) hdu.writeto(savename, overwrite=True) self._super_bias = super_bias self._super_bias_sig = super_bias_sig def get_super_bias_update(self, force=False, **kwargs): # Make sure initial super bias exists if (self._super_bias is None) or (self._super_bias_sig is None): self.get_super_bias_init(**kwargs) # File names fname = self.paths_dict['super_bias'] file_exists = os.path.isfile(fname) if file_exists and (not force): # Grab updated Super Bias _log.info("Opening updated super bias") self._super_bias = get_fits_data(fname) else: # Generate Super Bias along with dark ramp and pixel masks self.get_super_dark_ramp(force=force, **kwargs)
[docs] def get_super_dark_ramp(self, force=False, **kwargs): """Create or read super dark ramp and update super bias""" # Make sure initial super bias exists if (self._super_bias is None) or (self._super_bias_sig is None): self.get_super_bias_init(**kwargs) _log.info("Creating super dark ramp cube, updated super bias, and pixel mask info") # File names fname_super_dark_ramp = self.paths_dict['super_dark_ramp'] fname_super_bias = self.paths_dict['super_bias'] fname_pixel_mask = self.paths_dict['pixel_masks'] file_exists = os.path.isfile(fname_super_dark_ramp) if file_exists and (not force): # Grab Super Dark Ramp super_dark_ramp = get_fits_data(fname_super_dark_ramp) # Grab updated Super Bias super_bias = get_fits_data(fname_super_bias) # Generate pixel masks dictionary masks_dict = {} hdul = fits.open(fname_pixel_mask) for hdu in hdul: key = hdu.name.lower() masks_dict[key] = hdu.data.astype('bool') hdul.close() else: allfiles = self.allfiles # Default kwargs to run kwargs_def = { 'nchans': self.nchan, 'altcol': True, 'in_place': True, 'fixcol': True, 'avg_type': 'pixel', 'savgol': True, 'perint': False } for k in kwargs_def.keys(): if k not in kwargs: kwargs[k] = kwargs_def[k] res = gen_super_dark(allfiles, super_bias=self.super_bias, DMS=self.DMS, **kwargs) super_dark_ramp, bias_off, masks_dict = res # Add residual bias offset super_bias = self.super_bias + bias_off # Save updated superbias frame to directory hdu = fits.PrimaryHDU(super_bias) hdu.writeto(fname_super_bias, overwrite=True) # Save super dark ramp hdu = fits.PrimaryHDU(super_dark_ramp.astype(np.float32)) # hdu = fits.PrimaryHDU(super_dark_ramp) hdu.writeto(fname_super_dark_ramp, overwrite=True) # Save mask dictionary to a compressed FITS file hdul = fits.HDUList() for k in masks_dict.keys(): data = masks_dict[k].astype('uint8') hdu = fits.ImageHDU(data, name=k) hdul.append(hdu) output = gzip.open(fname_pixel_mask, 'wb') hdul.writeto(output, overwrite=True) output.close() # Save as class attributes self._super_dark_ramp = super_dark_ramp self._super_bias = super_bias self._pixel_masks = masks_dict
[docs] def get_dark_slope_image(self, deg=1, force=False): """ Calculate dark slope image""" _log.info('Calculating dark slope image...') fname = self.paths_dict['super_dark'] file_exists = os.path.isfile(fname) if file_exists and (not force): # Grab Super Dark super_dark = get_fits_data(fname) else: if self._super_dark_ramp is None: self.get_super_dark_ramp() # Get dark slope image cf = jl_poly_fit(self.time_arr, self.super_dark_ramp, deg=deg) super_dark = cf[1] # Save super dark frame to directory hdu = fits.PrimaryHDU(super_dark) hdu.writeto(fname, overwrite=True) self._super_dark = super_dark
[docs] def get_pixel_slope_averages(self, force=False): """Get average pixel ramp""" _log.info('Calculating average pixel ramps...') fname = self.paths_dict['super_dark_ramp_avgs'] file_exists = os.path.isfile(fname) if file_exists and (not force): out = np.load(fname) ramp_avg_ch = out.get('ramp_avg_ch') ramp_avg_all = out.get('ramp_avg_all') else: if self._super_dark_ramp is None: _log.error("`super_dark_ramp` is not defined. Please run self.get_super_dark_ramp().") return nz = self.dark_shape[0] nchan = self.nchan chsize = self.chsize # Average slope in each channel ramp_avg_ch = [] for ch in range(nchan): ramp_ch = self.super_dark_ramp[:,:,ch*chsize:(ch+1)*chsize] avg = np.median(ramp_ch.reshape([nz,-1]), axis=1) ramp_avg_ch.append(avg) ramp_avg_ch = np.array(ramp_avg_ch) # Average ramp for all pixels ramp_avg_all = np.mean(ramp_avg_ch, axis=0) np.savez(fname, ramp_avg_ch=ramp_avg_ch, ramp_avg_all=ramp_avg_all) self._dark_ramp_dict = { 'ramp_avg_ch' : ramp_avg_ch, 'ramp_avg_all' : ramp_avg_all }
[docs] def get_ipc(self, calc_ppc=False): """Calculate IPC (and PPC) kernels""" if calc_ppc: _log.info("Calculating IPC and PPC kernels...") else: _log.info("Calculating IPC kernels...") fname_ipc = self.paths_dict['kernel_ipc'] fname_ppc = self.paths_dict['kernel_ppc'] gen_vals = False if os.path.isfile(fname_ipc): k_ipc, k_ipc_sig = get_fits_data(fname_ipc) else: gen_vals = True if calc_ppc: if os.path.isfile(fname_ppc): k_ppc, k_ppc_sig = get_fits_data(fname_ppc) else: gen_vals = True # Do we need to generate IPC/PPC values? if gen_vals: if self.super_dark_ramp is None: _log.error("`super_dark_ramp` is not defined. Please run get_super_dark_ramp().") return dark_ramp = self.super_dark_ramp[1:] - self.super_dark_ramp[0] # Subtract away averaged spatial background from each frame dark_med = ndimage.median_filter(self.super_dark, 7) tarr = self.time_arr[1:] - self.time_arr[0] for i, im in enumerate(dark_ramp): im -= dark_med*tarr[i] nchan = self.nchan chsize = self.chsize ssd = self.det.same_scan_direction rsd = self.det.reverse_scan_direction # Set the average of each channel in each image to 0 for ch in np.arange(nchan): x1 = int(ch*chsize) x2 = int(x1 + chsize) dark_ramp_ch = dark_ramp[:,:,x1:x2] dark_ramp_ch = dark_ramp_ch.reshape([dark_ramp.shape[0],-1]) chmed_arr = np.median(dark_ramp_ch, axis=1) dark_ramp[:,:,x1:x2] -= chmed_arr.reshape([-1,1,1]) k_ipc_arr = [] k_ppc_arr = [] for im in dark_ramp[::4]: diff = dark_ramp[-1] - im res = get_ipc_kernel(diff, bg_remove=False, boxsize=5, calc_ppc=calc_ppc, same_scan_direction=ssd, reverse_scan_direction=rsd, suppress_error_msg=True) if res is not None: if calc_ppc: k_ipc, k_ppc = res k_ppc_arr.append(k_ppc) else: k_ipc = res k_ipc_arr.append(k_ipc) # Average IPC values k_ipc_arr = np.array(k_ipc_arr) k_ipc = robust.mean(k_ipc_arr, axis=0) k_ipc_sig = robust.std(k_ipc_arr, axis=0) # Ensure kernels are normalized to 1 ipc_norm = k_ipc.sum() k_ipc /= ipc_norm k_ipc_sig /= ipc_norm # Save IPC kernel to file hdu = fits.PrimaryHDU(np.array([k_ipc, k_ipc_sig])) hdu.writeto(fname_ipc, overwrite=True) # PPC values if calc_ppc: k_ppc_arr = np.array(k_ppc_arr) k_ppc = robust.mean(k_ppc_arr, axis=0) k_ppc_sig = np.std(k_ppc_arr, axis=0) ppc_norm = k_ppc.sum() k_ppc /= ppc_norm k_ppc_sig /= ppc_norm # Save IPC kernel to file hdu = fits.PrimaryHDU(np.array([k_ppc, k_ppc_sig])) hdu.writeto(fname_ppc, overwrite=True) # Store kernel information self._kernel_ipc = k_ipc self._kernel_ipc_sig = k_ipc_sig alpha = k_ipc[1,2] alpha_sig = k_ipc_sig[1,2] _log.info(' IPC = {:.3f}% +/- {:.3f}%'.format(alpha*100, alpha_sig*100)) # PPC values if calc_ppc: self._kernel_ppc = k_ppc self._kernel_ppc_sig = k_ppc_sig ppc = k_ppc[1,2] ppc_sig = k_ppc_sig[1,2] _log.info(' PPC = {:.3f}% +/- {:.3f}%'.format(ppc*100, ppc_sig*100))
[docs] def get_ktc_noise(self, **kwargs): """Calculate and store kTC (Reset) Noise Keyword Args ------------ bias_sigma_arr : ndarray Image of the pixel uncertainties. binsize : float Size of the histogram bins. return_std : bool Also return the standard deviation of the distribution? """ if self._super_bias_sig is None: # Make sure super bias sigma exists _log.info('Obtaining sigma image for super bias...') self.get_super_bias_init() _log.info("Calculating kTC Noise for active and reference pixels...") # kTC Noise (DN) im = self._super_bias_sig[self.mask_act] self._ktc_noise = calc_ktc(im, **kwargs) # kTC Noise of reference pixels im = self._super_bias_sig[self.mask_ref] self._ktc_noise_ref= calc_ktc(im, binsize=1)
[docs] def get_cds_dict(self, force=False): """Calculate CDS noise for all files Creates a dictionary of CDS noise components, including total noise, amplifier 1/f noise, correlated 1/f noise, white noise, and reference pixel ratios. Two different methods are used to calculate CDS per pixels: temporal and spatial. Creates dictionary attributes `self.cds_act_dict` and `self.cds_ref_dict`. """ _log.info("Building CDS Noise dictionaries...") ssd = self.det.same_scan_direction outname1 = self.paths_dict['cds_act_dict'] outname2 = self.paths_dict['cds_ref_dict'] both_exist = os.path.exists(outname1) and os.path.exists(outname2) if (not both_exist) or force: # Create CDS dictionaries cds_act_dict, cds_ref_dict = gen_cds_dict( self.allfiles, superbias=self.super_bias, mask_good_arr=self._pixel_masks['mask_poly'], same_scan_direction=ssd, DMS=self.DMS) # Save active pixel dictionary self._dict_to_json(cds_act_dict, outname1) # Save reference pixel dictionary self._dict_to_json(cds_ref_dict, outname2) # Load dictionaries self._cds_act_dict = self._json_to_dict(outname1) self._cds_ref_dict = self._json_to_dict(outname2)
[docs] def get_effective_noise(self, ideal_Poisson=False, force=False): "Calculate effective noise curves for each readout pattern" outname = self.paths_dict['eff_noise_dict'] allfiles = self.allfiles superbias = self.super_bias det = self.det nchan = det.nout gain = det.gain patterns = list(det.multiaccum._pattern_settings.keys()) if os.path.exists(outname) and (not force): # Load from JSON files with open(outname, 'r') as fp: dtemp = json.load(fp) # Convert to arrays for k in dtemp.keys(): d2 = dtemp[k] out_list = [np.array(d2[patt]) for patt in patterns] dtemp[k] = out_list ng_all_list = dtemp['ng_all_list'] en_spat_list = dtemp['en_spat_list'] else: ng_all_list = [] en_spat_list = [] #en_temp_list = [] for patt in tqdm(patterns, leave=False, desc='Patterns'): res = calc_eff_noise(allfiles, superbias=superbias, read_pattern=patt, temporal=False) # ng_all, eff_noise_temp, eff_noise_spa = res ng_all, eff_noise_spat = res # List of ngroups arrays ng_all_list.append(ng_all) en_spat_list.append(eff_noise_spat) #en_temp_list.append(eff_noise_temp) # Place variables into dictionary for saving to disk dtemp = {'ng_all_list' : ng_all_list, 'en_spat_list' : en_spat_list} # Make sure everything are in list format for k in dtemp.keys(): arr = dtemp[k] d2 = {} for i, patt in enumerate(patterns): d2[patt] = arr[i].tolist() dtemp[k] = d2 # Save to a JSON file with open(outname, 'w') as fp: json.dump(dtemp, fp, sort_keys=False, indent=4) # Suppress info logs log_prev = conf.logging_level setup_logging('WARN', verbose=False) tarr_all = [] for i, patt in enumerate(patterns): det_new = deepcopy(det) ma_new = det_new.multiaccum ma_new.read_mode = patt # ma_new.ngroup = int((det.multiaccum.ngroup - ma_new.nd1 + ma_new.nd2) / (ma_new.nf + ma_new.nd2)) # tvals_all.append(det_new.times_group_avg) # Times associated with each calcualted group ng_all = ng_all_list[i] tarr_all.append((ng_all-1)*det_new.time_group) # Determine excess variance parameters from scipy.optimize import least_squares#, leastsq en_dn_list = [] for i in range(len(patterns)): # Average spatial and temporal values # var_avg_ch = (en_spat_list[i]**2 + en_temp_list[i]**2) / 2 # var_avg_ch = en_temp_list[i]**2 var_avg_ch = en_spat_list[i]**2 en_dn_list.append(np.sqrt(var_avg_ch[0:nchan].mean(axis=0))) # Average dark current (e-/sec) if self.dark_ramp_dict is None: idark_avg = det.dark_current else: idark = [] tarr = self.time_arr for ch in np.arange(nchan): y = self.dark_ramp_dict['ramp_avg_ch'][ch] cf = jl_poly_fit(tarr, y, deg=1) idark.append(cf[1]) idark = np.array(idark) * gain idark_avg = np.mean(idark) # Average read noise per frame (e-) cds_var = (en_dn_list[0][0] * det.time_group * gain)**2 - (idark_avg * det.time_group) read_noise = np.sqrt(cds_var / 2) p0 = [1.5,10] # Initial guess args=(det, patterns, ng_all_list, en_dn_list) kwargs = {'idark':idark_avg, 'read_noise':read_noise, 'ideal_Poisson':ideal_Poisson} res_lsq = least_squares(fit_func_var_ex, p0, args=args, kwargs=kwargs) p_excess = res_lsq.x setup_logging(log_prev, verbose=False) _log.info(" Best fit excess variance model parameters: {}".format(p_excess)) self._eff_noise_dict = { 'patterns' : patterns, # Readout patterns 'ng_all_list' : ng_all_list, # List of groups fit 'tarr_all_list' : tarr_all, # Associated time values 'en_spat_list' : en_spat_list, # Effective noise per channel (spatial) 'p_excess' : p_excess # Excess variance model parameters (best fit) }
[docs] def calc_cds_noise(self, cds_type='spatial', temperature=None, temp_key='T_FPA1'): """ Return CDS Noise components for each channel Parameters ---------- cds_type : str Return 'spatial', 'temporal', or 'average' noise values? temperature : float or None Option to supply temperature at which to interpolate. If None is provided, then returns the median of all noise values. temp_key : str Temperature key from `self.temperature_dict` to interpolate over. Generally, either 'T_FPA1' or 'T_FPA2' as those most closely represent the detector operating temperatures. """ def cds_fit(tval, temps, cds_per_ch): """Fit """ cds_arr = [] for ch in np.arange(self.nchan): cf = jl_poly_fit(temp_arr, cds_per_ch[:,ch]) cds_arr.append(jl_poly(temperature, cf)) return np.array(cds_arr).squeeze() if (self.cds_act_dict is None) or (self.cds_ref_dict is None): _log.error('Dictionaries of CDS noise need generating: See `get_cds_dict()`') return # Temperature array temp_arr = np.array(self.temperature_dict[temp_key]) if temperature is not None: if (temperature<temp_arr.min()) or (temperature>temp_arr.max()): tbounds = 'T=[{:.2f}, {:.2f}]K'.format(temp_arr.min(), temp_arr.max()) _log.warn('Requested temperature is outside of bounds: {}.'.format(tbounds)) _log.warn('Extrapolation may be inaccurate.') # CDS dictionary arrays d_act = self.cds_act_dict d_ref = self.cds_ref_dict if 'spat' in cds_type: cds_type_list = ['spat'] elif 'temp' in cds_type: cds_type_list = ['temp'] else: cds_type_list = ['spat', 'temp'] cds_tot = cds_white = 0 cds_pink_uncorr = cds_pink_corr = 0 ref_ratio_all = 0 for ct in cds_type_list: # Total noise per channel cds_key = f'{ct}_tot' if temperature is None: cds_tot += np.median(d_act[cds_key], axis=0) else: cds_tot += cds_fit(temperature, temp_arr, d_act[cds_key]) # White noise per channel cds_key = f'{ct}_det' if temperature is None: cds_white += np.median(d_act[cds_key], axis=0) else: cds_white += cds_fit(temperature, temp_arr, d_act[cds_key]) # 1/f noise per channel cds_key = f'{ct}_pink_uncorr' cds_pink_uncorr += np.median(d_act[cds_key], axis=0) # Correlated noise cds_key = f'{ct}_pink_corr' cds_pink_corr += np.median(d_act[cds_key]) # Reference pixel noise ratio cds_key = f'{ct}_det' # or f'{cds_type}_tot'? ref_ratio_all += (d_ref[cds_key] / d_act[cds_key]) ref_ratio = np.mean(ref_ratio_all) # Scale by number of modes included ntype = len(cds_type_list) cds_dict = { 'tot' : cds_tot / ntype, 'white' : cds_white / ntype, 'pink_uncorr' : cds_pink_uncorr / ntype, 'pink_corr' : cds_pink_corr / ntype, 'ref_ratio' : ref_ratio / ntype } return cds_dict
[docs] def get_column_variations(self, force=False, **kwargs): """ Get column offset variations Create a series of column offset models. These are likely FETS in the ASIC preamp or ADC causing entire columns within a ramp to jump around. """ _log.info("Determining column variations (RTN)") allfiles = self.allfiles outname = self.paths_dict['column_variations'] file_exists = os.path.isfile(outname) if file_exists and (not force): ramp_column_varations, header = get_fits_data(outname, return_header=True) prob_bad = header['PROB_VAR'] else: kwargs_def = { 'nchans': self.nchan, 'altcol': True, 'in_place': True, 'fixcol': True, 'avg_type': 'pixel', 'savgol': True, 'perint': False } for k in kwargs_def.keys(): if k not in kwargs: kwargs[k] = kwargs_def[k] # Generate a compilation of column variations res = gen_col_variations(allfiles, DMS=self.DMS, super_bias=self.super_bias, super_dark_ramp=self.super_dark_ramp, **kwargs) ramp_column_varations, prob_bad = res # Save column ramp variations hdu = fits.PrimaryHDU(ramp_column_varations) hdu.header['PROB_VAR'] = prob_bad hdu.writeto(outname, overwrite=True) self._column_variations = ramp_column_varations self._column_prob_bad = prob_bad
[docs] def get_ref_pixel_noise(self, force=False, **kwargs): """ Generate Dictionary of Reference Pixel behavior info""" _log.info("Determining reference pixel behavior") allfiles = self.allfiles outname = self.paths_dict['ref_pix_variations'] file_exists = os.path.isfile(outname) if (not file_exists) or force: kwargs_def = { 'nchans': self.nchan, 'altcol': True, 'in_place': True, 'fixcol': True, 'avg_type': 'pixel', 'savgol': True, 'perint': False } for k in kwargs_def.keys(): if k not in kwargs: kwargs[k] = kwargs_def[k] ref_dict = gen_ref_dict(allfiles, self.super_bias, DMS=self.DMS, **kwargs) # Save to JSON file self._dict_to_json(ref_dict, outname) # Load from JSON file self._ref_pixel_dict = self._json_to_dict(outname)
[docs] def get_power_spectrum(self, include_oh=False, calc_cds=True, per_pixel=False, return_corr=False, return_ucorr=False, mn_func=np.mean, force=False, save=True): """ Keyword Args ============ include_oh : bool Zero-pad the data to insert line and frame overhead pixels? calc_cds : bool Power spectrum of CDS pairs or individual frames? return_corr : bool Return power spectrum of channel correlated 1/f noise? return_ucorr : bool Return power spectra of channel-dependent (uncorrelated) 1/f noise? per_pixel : bool Calculate average power spectrum of each pixel along ramp (frame timescales)? If False, samples pixels within a frame (pixel read timescales). """ _log.info("Building noise power spectrum dictionary...") # Get file name to save results if per_pixel: outname = self.paths_dict['power_spec_cds_pix'] if calc_cds else self.paths_dict['power_spec_full_pix'] else: if include_oh: outname = self.paths_dict['power_spec_cds_oh'] if calc_cds else self.paths_dict['power_spec_full_oh'] else: outname = self.paths_dict['power_spec_cds'] if calc_cds else self.paths_dict['power_spec_full'] file_exists = os.path.isfile(outname) if file_exists and (not force): with open(outname, 'rb') as f: ps_all = np.load(f) ps_corr = np.load(f) ps_ucorr = np.load(f) else: super_bias = self.super_bias if super_bias is None: raise AttributeError('Super bias (`self.super_bias = None`) file has not been loaded.') ssd = self.det.same_scan_direction rsd = self.det.reverse_scan_direction res = get_power_spec_all(self.allfiles, super_bias=super_bias, det=self.det, DMS=self.DMS, include_oh=include_oh, calc_cds=calc_cds, return_corr=return_corr, return_ucorr=return_ucorr, mn_func=mn_func, per_pixel=per_pixel, same_scan_direction=ssd, reverse_scan_direction=rsd) ps_all, ps_corr, ps_ucorr = res # Set as an arrays of 0s if not calculated for saving purposes ps_corr = np.zeros_like(ps_all[0]).astype('bool') if ps_corr is None else ps_corr ps_ucorr = np.zeros_like(ps_all).astype('bool') if ps_ucorr is None else ps_ucorr # Save arrays to disk if save: with open(outname, 'wb') as f: np.save(f, ps_all) np.save(f, ps_corr) np.save(f, ps_ucorr) # If corr or ucorr were saved as 0s, set to None ps_corr = None if np.allclose(ps_corr, 0) else ps_corr ps_ucorr = None if np.allclose(ps_ucorr, 0) else ps_ucorr # Get corrsponding frequency arrays freq = get_freq_array(ps_all, dt=1/self.det._pixel_rate) self._pow_spec_dict = { 'freq' : freq, 'ps_all' : ps_all, 'ps_corr' : ps_corr, 'ps_ucorr' : ps_ucorr, } # TODO: Check if something similar for per_pixel if not per_pixel: # Estimate 1/f scale factors for broken correlated power spectrum freq = self.pow_spec_dict['freq'] ps_all = self.pow_spec_dict['ps_all'] # Noise values cds_dict = self.cds_act_dict keys = ['spat_det', 'temp_det', 'spat_pink_uncorr', 'temp_pink_uncorr'] cds_vals = np.array([np.sqrt(np.mean(cds_dict[k]**2, axis=0)) for k in keys]) rd_noise = np.sqrt(np.mean(cds_vals[:2]**2)) u_pink = np.sqrt(np.mean(cds_vals[2:]**2)) # White Noise yf = freq**(0) variance = np.mean(rd_noise**2) yf1 = len(yf) * variance * yf / yf.sum() # Uncorrelated Pink Noise yf = freq**(-1); yf[0]=0 variance = np.mean(u_pink**2) / np.sqrt(2) yf2 = len(yf) * variance * yf / yf.sum() # Get residual, to calculate scale factors for correlated noise model yresid = ps_all.mean(axis=0) - yf2 - yf1 scales = fit_corr_powspec(freq, yresid) self._pow_spec_dict['ps_corr_scale'] = scales
[docs] def get_super_flats(self, split_low_high=True, smth_sig=10, force=False, **kwargs): """Get flat field information Splits flat field into to lflats and pflats (low and high frequency). """ savename = self.paths_dict['super_flats'] file_exists = os.path.isfile(savename) if file_exists and (not force): _log.info("Loading flat field information...") # Grab Super Dark Ramp super_flats = get_fits_data(savename) else: # Default ref pixel correction kw args kwargs_def = { 'nchans': self.nchan, 'in_place': True, 'altcol': True, 'fixcol': True, 'avg_type': 'pixel', 'savgol': True, 'perint': False, } for k in kwargs_def.keys(): if k not in kwargs: kwargs[k] = kwargs_def[k] # Get nominal non-linear coefficients _log.info("Calculating flat field information...") allfiles = self.linfiles data, _ = gen_super_ramp(allfiles, super_bias=self.super_bias, **kwargs) if self.linear_dict is None: self.get_linear_coeffs() # IPC and PPC kernels kppc = self.kernel_ppc kipc = self.kernel_ipc # PPC corrections if (kppc is not None) and kppc[1,2]>0: data = ppc_deconvolve(data, kppc) # IPC correction if kipc is not None: data = ipc_deconvolve(data, kipc) # Linearity correction hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=self.DMS) data = apply_linearity(data, det, self.linear_dict) # Perform fit to data in DN/sec tarr = np.arange(1,len(data)+1) cf_arr = cube_fit(tarr, data, deg=1, sat_vals=det.well_level, sat_frac=0.8, fit_zero=False) im_slope = cf_arr[1] del data super_flats = get_flat_fields(im_slope, split_low_high=split_low_high, smth_sig=smth_sig, ref_info=det.ref_info) super_flats = np.asarray(super_flats) # Save superbias frame to directory hdu = fits.PrimaryHDU(super_flats) hdu.header['SMTH_SIG'] = smth_sig hdu.writeto(savename, overwrite=True) sh = super_flats.shape if len(sh)==3: nz, ny, nx = sh if nz==2: lflats, pflats = super_flats else: lflats = None pflats = super_flats else: lflats = None pflats = super_flats self.lflats = lflats self.pflats = pflats
def _get_linear_coeffs(self, deg=8, use_legendre=True, lxmap=[0,1e5], counts_cut=None, sat_calc=0.98, nonlin=False, force=False, DMS=None, super_bias=None, **kwargs): """ Determine non-linear coefficents These coefficients allow us to go from an ideal linear ramp to some observed (simulated) non-linear ramp. Parameters ========== force : bool Force calculation of coefficients. DMS : None or bool Option to specifiy if linearity files are DMS format. If set to None, then uses self.DMS. super_bias: None or ndarray Option to specify an input super bias image. If not specified, then defaults to self.super_bias. counts_cut : None or float Option to fit two sets of polynomial coefficients to lower and uppper values. 'counts_cut' specifies the division in values of electrons. Useful for pixels with different non-linear behavior at low flux levels. Recommended values of 15000 e-. deg : int Degree of polynomial to fit. Default=8. use_legendre : bool Fit with Legendre polynomial, an orthonormal basis set. Default=True. lxmap : ndarray or None Legendre polynomials are normaly mapped to xvals of [-1,+1]. `lxmap` gives the option to supply the values for xval that should get mapped to [-1,+1]. If set to None, then assumes [xvals.min(),xvals.max()]. """ if nonlin: savename = self.paths_dict['nonlinear_coeffs'] else: savename = self.paths_dict['linear_coeffs'] file_exists = os.path.isfile(savename) if file_exists and (not force): if nonlin: _log.info("Loading non-linearity coefficents") else: _log.info("Loading linearity coefficents") out = np.load(savename) cf_nonlin = out.get('cf_nonlin') counts_cut = out.get('counts_cut').tolist() if counts_cut == 0: counts_cut = None cf_nonlin_low = None else: cf_nonlin_low = out.get('cf_nonlin_low') use_legendre = out.get('use_legendre').tolist() lxmap = out.get('lxmap').tolist() deg = out.get('deg').tolist() if nonlin: cflin0_mean = out.get('cflin0_mean') cflin0_std = out.get('cflin0_std') corr_slope = out.get('corr_slope') corr_intercept = out.get('corr_intercept') sat_vals = out.get('sat_vals') else: if nonlin: _log.info("Generating non-linearity coefficents") else: _log.info("Generating linearity coefficents") allfiles = self.linfiles # Check if super bias exists if (self._super_bias is None) and (super_bias is None): _log.warn('Super bias not loaded or specified. Proceeding without bias correction.') elif super_bias is None: super_bias = self.super_bias if DMS is None: DMS = self.DMS # Set logging to WARNING to suppress messages log_prev = conf.logging_level setup_logging('WARN', verbose=False) f = allfiles[-1] hdr = fits.getheader(f) det = create_detops(hdr, DMS=DMS) setup_logging(log_prev, verbose=False) grp_max = find_group_sat(f, DMS=DMS, bias=super_bias, sat_calc=0.998) grp_max = grp_max + 10 if grp_max > det.multiaccum.ngroup: grp_max = det.multiaccum.ngroup # Default ref pixel correction kw args kwargs_def = { 'nchans': self.nchan, 'in_place': True, 'altcol': True, 'fixcol': True, 'avg_type': 'pixel', 'savgol': True, 'perint': False, } for k in kwargs_def.keys(): if k not in kwargs: kwargs[k] = kwargs_def[k] # Get nominal non-linear coefficients _log.info(" Calculating average coefficients...") kppc = self.kernel_ppc kipc = self.kernel_ipc res, sat_vals = get_linear_coeffs(allfiles, super_bias=super_bias, DMS=DMS, grp_max=grp_max, deg=deg, use_legendre=use_legendre, lxmap=lxmap, counts_cut=counts_cut, return_satvals=True, kppc=kppc, kipc=kipc, nonlin=nonlin, sat_calc=sat_calc, **kwargs) # Two separate fits for low and high pixel values if counts_cut is None: cf_nonlin = res cf_nonlin_low = None else: cf_nonlin, cf_nonlin_low = res # Obtain coefficient variations if nonlin: _log.info(" Calculating coefficient variations...") # Solve for coefficients for all data sets # Probes random variations cf_all = [] for file in tqdm(allfiles, desc='Variance', leave=False): res = get_linear_coeffs([file], super_bias=super_bias, DMS=DMS, counts_cut=counts_cut, deg=deg, use_legendre=use_legendre, lxmap=lxmap, grp_max=grp_max, sat_vals=sat_vals, nonlin=True, sat_calc=sat_calc, **kwargs) if counts_cut is None: cf = res else: # Ignore variations to lower fits cf, _ = res cf_all.append(cf) cf_all = np.array(cf_all) # Coefficients are related to each # Save the linear correlation for each pixel cf_all_min = np.min(cf_all, axis=0) cf_all_max = np.max(cf_all, axis=0) cf_all_mean = np.mean(cf_all, axis=0) corr_slope1 = (cf_all_max[1:] - cf_all_mean[1:]) / (cf_all_max[0] - cf_all_mean[0]) corr_slope2 = (cf_all_mean[1:] - cf_all_min[1:]) / (cf_all_mean[0] - cf_all_min[0]) corr_slope = 0.5 * (corr_slope1 + corr_slope2) corr_intercept = cf_all_mean[1:] - corr_slope*cf_all_mean[0] corr_slope[:, self.mask_ref] = 0 corr_intercept[:, self.mask_ref] = 0 cflin0_mean = cf_nonlin[0] cflin0_std = np.std(cf_all[:,0,:,:], axis=0) if counts_cut is None: counts_cut = 0 cf_nonlin_low = 0 np.savez(savename, cf_nonlin=cf_nonlin, cflin0_mean=cflin0_mean, cflin0_std=cflin0_std, corr_slope=corr_slope, corr_intercept=corr_intercept, deg=deg, use_legendre=use_legendre, lxmap=lxmap, sat_vals=sat_vals, counts_cut=counts_cut, cf_nonlin_low=cf_nonlin_low) else: if counts_cut is None: counts_cut = 0 cf_nonlin_low = 0 np.savez(savename, cf_nonlin=cf_nonlin, cf_nonlin_low=cf_nonlin_low, counts_cut=counts_cut, deg=deg, use_legendre=use_legendre, lxmap=lxmap, sat_vals=sat_vals) # Additional check on fitting of lower values if (counts_cut==0) or (counts_cut is None): counts_cut = None cf_nonlin_low = None # Store everything in dictionary if nonlin: self.nonlinear_dict = { 'cf_nonlin' : cf_nonlin, 'cflin0_mean' : cflin0_mean, 'cflin0_std' : cflin0_std, 'corr_slope' : corr_slope, 'corr_intercept' : corr_intercept, 'use_legendre' : use_legendre, 'lxmap' : lxmap, 'deg' : deg, 'counts_cut' : counts_cut, 'cf_nonlin_low' : cf_nonlin_low, 'sat_vals' : sat_vals, } else: self.linear_dict = { 'cf_nonlin' : cf_nonlin, 'cf_nonlin_low' : cf_nonlin_low, 'counts_cut' : counts_cut, 'use_legendre' : use_legendre, 'lxmap' : lxmap, 'deg' : deg, }
[docs] def get_nonlinear_coeffs(self, deg=8, use_legendre=True, lxmap=[0,1e5], counts_cut=15000, sat_calc=0.998, force=False, DMS=None, super_bias=None, **kwargs): """ Determine non-linear coefficents These coefficients allow us to go from an ideal linear ramp to some observed (simulated) non-linear ramp. Value are store in the self.nonlinear_dict dictionary. Parameters ========== force : bool Force calculation of coefficients. DMS : None or bool Option to specifiy if linearity files are DMS format. If set to None, then uses self.DMS. super_bias: None or ndarray Option to specify an input super bias image. If not specified, then defaults to self.super_bias. counts_cut : None or float Option to fit two sets of polynomial coefficients to lower and uppper values. 'counts_cut' specifies the division in values of electrons. Useful for pixels with different non-linear behavior at low flux levels. Recommended values of 15000 e-. deg : int Degree of polynomial to fit. Default=8. use_legendre : bool Fit with Legendre polynomial, an orthonormal basis set. Default=True. lxmap : ndarray or None Legendre polynomials are normaly mapped to xvals of [-1,+1]. `lxmap` gives the option to supply the values for xval that should get mapped to [-1,+1]. If set to None, then assumes [xvals.min(),xvals.max()]. """ self._get_linear_coeffs(deg=deg, use_legendre=use_legendre, lxmap=lxmap, counts_cut=counts_cut, sat_calc=sat_calc, nonlin=True, force=force, DMS=DMS, super_bias=super_bias, **kwargs)
[docs] def get_linear_coeffs(self, deg=8, use_legendre=True, lxmap=[0,1e5], counts_cut=None, sat_calc=0.98, force=False, DMS=None, super_bias=None, **kwargs): """ Determine linearity coefficents These coefficients allow us to convert from an observed ramp (DN) to an idealized linear ramp (in e-). Values are stored in the dictionary self.linear_dict. Parameters ========== force : bool Force calculation of coefficients. DMS : None or bool Option to specifiy if linearity files are DMS format. If set to None, then uses self.DMS. super_bias: None or ndarray Option to specify an input super bias image. If not specified, then defaults to self.super_bias. counts_cut : None or float Option to fit two sets of polynomial coefficients to lower and uppper values. 'counts_cut' specifies the division in values of electrons. Useful for pixels with different non-linear behavior at low flux levels. Recommended values of 15000 e-. deg : int Degree of polynomial to fit. Default=8. use_legendre : bool Fit with Legendre polynomial, an orthonormal basis set. Default=True. lxmap : ndarray or None Legendre polynomials are normaly mapped to xvals of [-1,+1]. `lxmap` gives the option to supply the values for xval that should get mapped to [-1,+1]. If set to None, then assumes [xvals.min(),xvals.max()]. """ if counts_cut is not None: counts_cut = counts_cut / self.det.gain self._get_linear_coeffs(deg=deg, use_legendre=use_legendre, lxmap=lxmap, counts_cut=counts_cut, sat_calc=sat_calc, nonlin=False, force=force, DMS=DMS, super_bias=super_bias, **kwargs)
[docs] def deconvolve_supers(self): """ Deconvolve the super dark and super bias images """ k_ppc = self.kernel_ppc k_ipc = self.kernel_ipc if (k_ppc is None) and (k_ipc is None): _log.error("Neither IPC or PPC kernels are defined") return _log.info("Deconvolving super dark and super bias images...") # PPC Deconvolution if k_ppc is not None: ssd = self.det.same_scan_direction rsd = self.det.reverse_scan_direction super_dark_deconv = ppc_deconvolve(self.super_dark, k_ppc, same_scan_direction=ssd, reverse_scan_direction=rsd) super_bias_deconv = ppc_deconvolve(self.super_bias, k_ppc, same_scan_direction=ssd, reverse_scan_direction=rsd) # IPC Deconvolution if k_ipc is not None: super_dark_deconv = ipc_deconvolve(super_dark_deconv, k_ipc) super_bias_deconv = ipc_deconvolve(super_bias_deconv, k_ipc) self._super_dark_deconv = super_dark_deconv self._super_bias_deconv = super_bias_deconv
[docs] def plot_bias_darks(self, save=False, return_figax=False, deconvolve=False): """ """ if self.super_bias is None: _log.error("Super bias image has not yet been generated.") return if self.super_dark is None: _log.error("Super dark image has not yet been generated.") return scaid = self.scaid if deconvolve: super_bias, super_dark = self.super_bias_deconv, self.super_dark_deconv else: super_bias, super_dark = self.super_bias, self.super_dark fig, axes = plt.subplots(1,2,figsize=(14,8.5), sharey=True) cbar_labels = ['Relative Offset (DN)', 'Dark Current (DN/sec)'] for i, im in enumerate([super_bias, super_dark]): mn = np.median(im) std = robust.medabsdev(im) vmin = mn - 3*std vmax = mn + 3*std ax = axes[i] image = ax.imshow(im, vmin=vmin, vmax=vmax) # Add colorbar cbar = fig.colorbar(image, ax=ax, orientation='horizontal', pad=0.05, fraction=0.1, aspect=30, shrink=1) cbar.set_label(cbar_labels[i]) # Add titles and labels titles = ['Super Bias Image', 'Super Dark Current Image'] for i, ax in enumerate(axes): ax.set_title(titles[i]) fig.suptitle(f'SCA {scaid}', fontsize=16) fig.tight_layout() fig.subplots_adjust(top=0.92, wspace=0.02, bottom=0.01) if save: fname = f'{scaid}_bias_dark_images.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
[docs] def plot_dark_ramps(self, save=True, time_cut=None, return_figax=False): """ Plot average dark current ramps time_cut : float Some darks show distinct slopes before and after a characteristic time. Setting this keyword will fit separate slopes before and after the specified time. A time of 200 sec is used for SCA 485. """ # Make sure dictionary is not empty if self._dark_ramp_dict is None: self.get_pixel_slope_averages() scaid = self.scaid fig, axes = plt.subplots(1,2, figsize=(14,5)) axes = axes.flatten() # Plot average of all pixel ax = axes[0] ax.set_title('Average Ramp of All Pixels') tarr = self.time_arr y = self.dark_ramp_dict['ramp_avg_all'] ax.plot(tarr, y, marker='.', label='Median Pixel Values') if time_cut is None: cf = jl_poly_fit(tarr, y, deg=1) ax.plot(tarr, jl_poly(tarr,cf), label='Slope Fit = {:.4f} DN/sec'.format(cf[1])) else: for ind in [tarr<time_cut, tarr>time_cut, tarr>0]: cf = jl_poly_fit(tarr[ind], y[ind], deg=1) ax.plot(tarr, jl_poly(tarr,cf), label='Slope Fit = {:.4f} DN/sec'.format(cf[1])) # Plot each channel separately ax = axes[1] ax.set_title('Channel Ramps') for i in range(self.nchan): y = self.dark_ramp_dict['ramp_avg_ch'][i] cf = jl_poly_fit(tarr, y, deg=1) label = 'Ch{} = {:.4f} DN/sec'.format(i, cf[1]) ax.plot(tarr, y, marker='.', label=label) ylim1 = ylim2 = 0 for ax in axes: ax.set_xlabel('Time (sec)') ax.set_ylabel('Signal (DN)') ax_yl = ax.get_ylim() ylim1 = np.min([ylim1, ax_yl[0]]) ylim2 = np.max([ylim2, ax_yl[1]]) ax.legend() for ax in axes: ax.set_ylim([ylim1,ylim2]) # Plot baseline at y=0 xlim = ax.get_xlim() ax.plot(xlim, [0,0], color='k', ls='--', lw=1, alpha=0.25) ax.set_xlim(xlim) fig.suptitle(f'Dark Current (SCA {scaid})', fontsize=16) fig.tight_layout() fig.subplots_adjust(top=0.85) if save: fname = f'{scaid}_dark_ramp_avg.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
[docs] def plot_dark_ramps_ch(self, save=True, time_cut=None, return_figax=False): """ Plot fits to each channel dark current ramp time_cut : float Some darks show distinct slopes before and after a characteristic time. Setting this keyword will fit separate slopes before and after the specified time. A time of 200 sec is used for SCA 485. """ # Make sure dictionary is not empty if self._dark_ramp_dict is None: self.get_pixel_slope_averages() scaid = self.scaid fig, axes = plt.subplots(2,2, figsize=(14,9)) axes = axes.flatten() # Plot Individual Channels tarr = self.time_arr for i in range(self.nchan): ax = axes[i] y = self.dark_ramp_dict['ramp_avg_ch'][i] ax.plot(tarr, y, marker='.', label='Pixel Averages') if time_cut is None: cf = jl_poly_fit(tarr, y, deg=1) ax.plot(tarr, jl_poly(tarr,cf), label='Slope = {:.4f} DN/sec'.format(cf[1])) else: for ind in [tarr<time_cut, tarr>time_cut, tarr>0]: cf = jl_poly_fit(tarr[ind], y[ind], deg=1) ax.plot(tarr, jl_poly(tarr,cf), label='Slope = {:.4f} DN/sec'.format(cf[1])) ax.set_title(f'Amplifier Channel {i}') ylim1 = ylim2 = 0 for ax in axes: ax.set_xlabel('Time (sec)') ax.set_ylabel('Dark Value (DN)') ax_yl = ax.get_ylim() ylim1 = np.min([ylim1, ax_yl[0]]) ylim2 = np.max([ylim2, ax_yl[1]]) ax.legend() for ax in axes: ax.set_ylim([ylim1,ylim2]) # Plot baseline at y=0 xlim = ax.get_xlim() ax.plot(xlim, [0,0], color='k', ls='--', lw=1, alpha=0.25) ax.set_xlim(xlim) fig.suptitle(f'Dark Current (SCA {scaid})', fontsize=16) fig.tight_layout() fig.subplots_adjust(top=0.9) if save: fname = f'{scaid}_dark_ramp_chans.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
[docs] def plot_dark_distribution(self, save=False, xlim=None, return_figax=False): """Plot histogram of dark slope""" act_mask = self.mask_act ch_mask = self.mask_channels nchan = self.nchan scaid = self.scaid # Histogram of Dark Slope if self.super_dark is None: _log.error("Super dark image has not yet been generated.") return fig, axes = plt.subplots(1,2, figsize=(14,5), sharey=True) # Full image ax = axes[0] im = self.super_dark[act_mask] plot_dark_histogram(im, ax) # Individual Amplifiers ax = axes[1] carr = ['C0', 'C1', 'C2', 'C3'] for ch in np.arange(nchan): ind = (ch_mask==ch) & act_mask im = self.super_dark[ind] label = f'Ch{ch}' plot_dark_histogram(im, ax, label=label, color=carr[ch], plot_fit=False, plot_cumsum=False) ax.set_ylabel('') ax.set_title('Active Pixels per Amplifier') # Plot baseline at y=0 for ax in axes: if xlim is None: xlim = ax.get_xlim() ax.plot(xlim, [0,0], color='k', ls='--', lw=1, alpha=0.25) ax.set_xlim(xlim) fig.suptitle(f'Dark Current Distriutions (SCA {self.scaid})', fontsize=16) fig.tight_layout() fig.subplots_adjust(top=0.85, wspace=0.025) if save: fname = f'{self.scaid}_dark_histogram.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
[docs] def plot_dark_overview(self, save=False, xlim_hist=None, return_figax=False): """Plot Overview of Dark Current Characteristics""" if self.super_dark is None: _log.error("Super dark image has not yet been generated.") return scaid = self.scaid fig, axes = plt.subplots(1,3,figsize=(14,5)) ######################################### # Dark Current slope image ax = axes[0] im = self.super_dark mn = np.median(im) std = robust.medabsdev(im) vmin = mn - 3*std vmax = mn + 3*std image = ax.imshow(im, vmin=vmin, vmax=vmax) # Add colorbar cbar = fig.colorbar(image, ax=ax, orientation='horizontal', pad=0.08, fraction=0.05, aspect=30, shrink=0.9) ax.set_title('Dark Current Image') cbar.set_label('Dark Current (DN/sec)') ######################################### # Average pixel slope over time ax = axes[1] ax.set_title('Average Ramp of All Pixels') tarr = self.time_arr y = self.dark_ramp_dict['ramp_avg_all'] ax.plot(tarr, y, marker='.', label='Median Pixel Values') cf = jl_poly_fit(tarr, y, deg=1) ax.plot(tarr, jl_poly(tarr,cf), label='Slope Fit = {:.4f} DN/sec'.format(cf[1])) # Plot baseline at y=0 xlim = ax.get_xlim() ax.plot(xlim, [0,0], color='k', ls='--', lw=1, alpha=0.25) ax.set_xlim(xlim) ax.set_xlabel('Time (sec)') ax.set_ylabel('Signal (DN)') ax.legend() ######################################### # Dark current histogram ax = axes[2] act_mask = self.mask_act scaid = self.scaid # Histogram of Dark Slope im = self.super_dark[act_mask] ax = plot_dark_histogram(im, ax, return_ax=True, plot_fit=False) ax.set_title('Slope Distribution') # Plot baseline at y=0 if xlim_hist is None: xlim_hist = ax.get_xlim() ax.plot(xlim_hist, [0,0], color='k', ls='--', lw=1, alpha=0.25) ax.set_xlim(xlim_hist) fig.suptitle(f'Dark Current Overview (SCA {scaid})', fontsize=16) fig.tight_layout() fig.subplots_adjust(bottom=0.1, top=0.85) if save: fname = f'{self.scaid}_dark_overview.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
def plot_ipc_ppc(self, k_ipc=None, k_ppc=None, save=False, return_figax=False): k_ipc = self.kernel_ipc if k_ipc is None else k_ipc k_ppc = self.kernel_ppc if k_ppc is None else k_ppc scaid = self.scaid if k_ipc is None: _log.info("IPC Kernel does not exist.") return if k_ipc is None: # Plot only IPC kernel fig, axes = plt.subplots(1,1, figsize=(5,5)) plot_kernel(k_ipc, ax=axes) axes.set_title('IPC Kernel', fontsize=16) fig.tight_layout() else: # Plot both IPC and PPC fig, axes = plt.subplots(1,2, figsize=(10,5.5), sharey=True) ax = axes[0] plot_kernel(k_ipc, ax=ax) ax.set_title('IPC Kernel') ax = axes[1] plot_kernel(k_ppc, ax=ax) ax.set_title('PPC Kernel') fig.suptitle(f"Pixel Deconvolution Kernels (SCA {scaid})", fontsize=16) fig.tight_layout() fig.subplots_adjust(wspace=0.075, top=0.9) if save: fname = f'{self.scaid}_pixel_kernels.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
[docs] def plot_reset_overview(self, save=False, binsize=0.25, xlim_hist=None, return_figax=False): """ Overview Plots of Bias and kTC Noise""" if self._super_bias is None: _log.error('Super bias image does not exist.') return if self._super_bias_sig is None: _log.error('Sigma image for super bias does not exist.') return scaid = self.scaid # Histogram of Bias kTC im = self._super_bias_sig[self.mask_act] binsize = binsize bins = np.arange(im.min(), im.max() + binsize, binsize) ig, vg, cv = hist_indices(im, bins=bins, return_more=True) nvals = np.array([len(i) for i in ig]) nvals_rel = nvals / nvals.max() # Peak of distribution if self.ktc_noise is None: self.get_ktc_noise(binsize=binsize) peak = self.ktc_noise fig, axes = plt.subplots(1,3,figsize=(14,5)) ##################################### # Plot super bias image ax = axes[0] im = self._super_bias mn = np.median(im) std = robust.std(im) ax.imshow(im, vmin=mn-3*std, vmax=mn+3*std) ax.set_title('Super Bias Image') ##################################### # Plot kTC noise image ax = axes[1] im = self._super_bias_sig mn = np.median(im) std = robust.std(im) ax.imshow(im, vmin=mn-3*std, vmax=mn+3*std) ax.set_title('kTC Noise = {:.1f} DN'.format(peak)) ##################################### # Plot kTC noise histogram ax = axes[2] ax.plot(cv, nvals_rel, label='Measured Noise') label = 'Peak ({:.1f} DN)'.format(peak) ax.plot(np.array([1,1])*peak, [0,1], ls='--', lw=1, label=label) ncum = np.cumsum(nvals) ax.plot(cv, ncum / ncum.max(), color='C3', lw=1, label='Cumulative Sum') ax.set_title('kTC Noise Distribution') ax.set_xlabel('Bias Noise (DN)') ax.set_ylabel('Relative Number of Pixels') ax.legend() ax.set_xlim([0,3*peak]) #ax.xaxis.get_major_locator().set_params(nbins=9, steps=[1, 2, 5, 10]) # Plot baseline at y=0 if xlim_hist is None: xlim_hist = ax.get_xlim() ax.plot(xlim_hist, [0,0], color='k', ls='--', lw=1, alpha=0.25) ax.set_xlim(xlim_hist) fig.suptitle(f'Reset Bias Overview (SCA {scaid})', fontsize=16) fig.tight_layout() fig.subplots_adjust(bottom=0.1, top=0.85) if save: fname = f'{self.scaid}_bias_overview.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
def plot_cds_noise(self, tkey='T_FPA1', save=False, return_figax=False, xlim=[36.1,40.1]): fig, axes = plt.subplots(2,3, figsize=(14,8), sharey=True) temp_arr = np.array(self.temperature_dict[tkey]) d = self.cds_act_dict d2 = self.cds_ref_dict # 1. Total Noise k1, k2 = ('spat_tot', 'temp_tot') for k, ax in zip([k1,k2], axes[:,0]): cds_arr = d[k] for ch in np.arange(self.nchan): ax.plot(temp_arr, cds_arr[:,ch], marker='o', ls='none', label=f'Ch{ch}') type_str = "Spatial" if 'spat' in k else "Temporal" title_str = f"{type_str} Total Noise" ax.set_title(title_str) # 2. White Noise k1, k2 = ('spat_det', 'temp_det') cmap = plt.get_cmap('tab20') tplot = np.array([temp_arr.min(), temp_arr.max()]) for k, ax in zip([k1,k2], axes[:,1]): cds_arr = d[k] pix_type = ['Active', 'Ref'] for j, cds_arr in enumerate([d[k], d2[k]]): marker = 'o' if j==0 else '.' for ch in np.arange(self.nchan): label = f'Ch{ch} ({pix_type[j]})' y = cds_arr[:,ch] ax.plot(temp_arr, y, marker=marker, ls='none', label=label, color=cmap(ch*2+j)) cf = jl_poly_fit(temp_arr, y) ax.plot(tplot, jl_poly(tplot, cf), lw=1, ls='--', color=cmap(ch*2+j)) type_str = "Spatial" if 'spat' in k else "Temporal" title_str = f"{type_str} White Noise" ax.set_title(title_str) # 3. Pink Noise k1, k2 = ('spat_pink_uncorr', 'temp_pink_uncorr') for k, ax in zip([k1,k2], axes[:,2]): cds_arr = d[k] for ch in np.arange(self.nchan): ax.plot(temp_arr, cds_arr[:,ch], marker='o', ls='none', label=f'Ch{ch}') k_corr = k.split('_') k_corr[-1] = 'corr' k_corr = '_'.join(k_corr) ax.plot(temp_arr, d[k_corr], marker='o', ls='none', label='Correlated') type_str = "Spatial" if 'spat' in k else "Temporal" title_str = f"{type_str} 1/f Noise" ax.set_title(title_str) for ax in axes: ax[0].set_ylabel('CDS Noise (DN)') for ax in axes[-1,:]: ax.set_xlabel('FPA Temperature (K)') for ax in axes.flatten(): ax.set_xlim(xlim) handles, labels = ax.get_legend_handles_labels() ncol = 2 if len(handles)>5 else 1 ax.legend(ncol=ncol) fig.suptitle(f'CDS Noise Overview (SCA {self.scaid})', fontsize=16) fig.tight_layout() fig.subplots_adjust(wspace=0.01, top=0.9) if save: fname = f'{self.scaid}_cds_noise.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
[docs] def plot_eff_noise(self, ideal_Poisson=False, save=False, return_figax=False): """Plot effective noise of slope fits""" det = self.det gain = det.gain nchan = det.nout # Average dark current (e-/sec) if self.dark_ramp_dict is None: idark = np.ones(nchan) * det.dark_current # e-/sec else: idark = [] tarr = self.time_arr for ch in np.arange(nchan): y = self.dark_ramp_dict['ramp_avg_ch'][ch] cf = jl_poly_fit(tarr, y, deg=1) idark.append(cf[1]) idark = np.array(idark) * gain # e-/sec eff_noise_dnsec = self.eff_noise_dict['en_spat_list'][0] # Average read noise per frame (e-) cds_var = (eff_noise_dnsec[0:nchan,0] * det.time_group * gain)**2 - (idark * det.time_group) read_noise = np.sqrt(cds_var / 2) # e- read_noise_ref = eff_noise_dnsec[-1,0] * det.time_group * gain / np.sqrt(2) ng_all = self.eff_noise_dict['ng_all_list'][0] tvals = self.eff_noise_dict['tarr_all_list'][0] p_excess = self.eff_noise_dict['p_excess'] colarr = ['C0', 'C1', 'C2', 'C3', 'C4'] fig, axes = plt.subplots(1,2, figsize=(14,4.5)) ax = axes[0] # Measured Values xvals = tvals yvals = eff_noise_dnsec for ch in range(nchan): axes[0].plot(xvals, yvals[ch]*tvals, marker='o', label=f'Ch{ch} - Meas', color=colarr[ch]) axes[1].semilogy(xvals, yvals[ch], marker='o', label=f'Ch{ch} - Meas', color=colarr[ch]) ch = -1 axes[0].plot(xvals, yvals[ch]*tvals, marker='o', label='Ref - Meas', color=colarr[ch]) axes[1].plot(xvals, yvals[ch], marker='o', label='Ref - Meas', color=colarr[ch]) # Theoretical Values xvals = tvals for ch in range(nchan): thr_e = det.pixel_noise(ng=ng_all, rn=read_noise[ch], idark=idark[ch], ideal_Poisson=ideal_Poisson, p_excess=p_excess) yvals2 = (thr_e * tvals) / gain axes[0].plot(xvals, yvals2, color=colarr[ch], lw=10, alpha=0.3, label=f'Ch{ch} - Theory') axes[1].plot(xvals, yvals2/tvals, color=colarr[ch], lw=10, alpha=0.3, label=f'Ch{ch} - Theory') ch = -1 thr_e = det.pixel_noise(ng=ng_all, rn=read_noise_ref, idark=0, p_excess=[0,0]) yvals2 = (thr_e * tvals) / gain axes[0].plot(xvals, yvals2, color=colarr[ch], lw=10, alpha=0.3, label=f'Ref - Theory') axes[1].plot(xvals, yvals2/tvals, color=colarr[ch], lw=10, alpha=0.3, label=f'Ref - Theory') ax = axes[0] ax.set_ylim([0,ax.get_ylim()[1]]) axes[0].set_ylabel('Effective Noise (DN)') axes[1].set_ylabel('Slope Noise (DN/sec)') for ax in axes: ax.set_xlabel('Time (sec)') #ax.set_title(f'Effective Noise (SCA {self.scaid})') axes[0].legend(ncol=2) fig.suptitle(f"Noise of Slope Fits (SCA {self.scaid})", fontsize=16) fig.tight_layout() fig.subplots_adjust(top=0.9) if save: fname = f'{self.scaid}_eff_noise.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
[docs] def plot_eff_noise_patterns(self, ideal_Poisson=False, save=False, ylim=None, return_figax=False): """Plot effective noise of slope fits for variety of read patterns""" det = self.det gain = det.gain nchan = det.nout patterns = list(det.multiaccum._pattern_settings.keys()) en_spat_list = self.eff_noise_dict['en_spat_list'] en_dn_list = [] for i in range(len(patterns)): # Average spatial and temporal values var_avg_ch = en_spat_list[i]**2 en_dn_list.append(np.sqrt(var_avg_ch[0:nchan].mean(axis=0))) tarr_all = self.eff_noise_dict['tarr_all_list'] ng_all_list = self.eff_noise_dict['ng_all_list'] p_excess = self.eff_noise_dict['p_excess'] # Average dark current (e-/sec) if self.dark_ramp_dict is None: idark_avg = det.dark_current else: idark = [] tarr = self.time_arr for ch in np.arange(nchan): y = self.dark_ramp_dict['ramp_avg_ch'][ch] cf = jl_poly_fit(tarr, y, deg=1) idark.append(cf[1]) idark = np.array(idark) * gain idark_avg = np.mean(idark) # Average read noise per frame (e-) cds_var = (en_dn_list[0][0] * det.time_group * gain)**2 - (idark_avg * det.time_group) read_noise = np.sqrt(cds_var / 2) fig, axes = plt.subplots(3,3, figsize=(14,9), sharey=True) axes = axes.flatten() log_prev = conf.logging_level setup_logging('WARN', verbose=False) for i, ax in enumerate(axes): tvals = tarr_all[i] yvals = (en_dn_list[i] * tvals) xvals = tvals ax.plot(xvals, yvals, marker='o', label='Measured') det_new = deepcopy(det) ma_new = det_new.multiaccum ma_new.read_mode = patterns[i] ng_all = ng_all_list[i] thr_e = det_new.pixel_noise(ng=ng_all, rn=read_noise, idark=idark_avg, ideal_Poisson=ideal_Poisson, p_excess=[0,0]) yvals = (thr_e * tvals) / gain ax.plot(xvals, yvals, color='C1', label='Theory') tvals = tarr_all[i] ng_all = ng_all_list[i] thr_e = det_new.pixel_noise(ng=ng_all, rn=read_noise, idark=idark_avg, ideal_Poisson=ideal_Poisson, p_excess=p_excess) yvals = (thr_e * tvals) / gain ax.plot(xvals, yvals, marker='.', color='C1', ls='--', label='Theory + Excess') for i, ax in enumerate(axes): if i==0: xr = [ax.get_xlim()[0],1200] ymax = 5*(int(ax.get_ylim()[1] / 5) + 1) yr = [0,ymax] if ylim is None else ylim ax.set_xlim(xr) ax.set_ylim(yr) ax.set_title(patterns[i]) if i>5: ax.set_xlabel('Time (sec)') if np.mod(i,3) == 0: ax.set_ylabel('Noise (DN)') # Legend on first plot axes[0].legend() fig.suptitle(f'Noise of Slope Fits (SCA {self.scaid})', fontsize=16) fig.tight_layout() fig.subplots_adjust(top=0.9, wspace=0.03) setup_logging(log_prev, verbose=False) if save: fname = f'{self.scaid}_eff_noise_patterns.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name) if return_figax: return fig, axes
def plot_power_spectrum(self, save=False, cds=True, return_figax=False): scaid = self.scaid cds_dict = self.cds_act_dict keys = ['spat_det', 'temp_pink_corr', 'temp_pink_uncorr'] cds_vals = [np.sqrt(np.mean(cds_dict[k]**2, axis=0)) for k in keys] rd_noise_cds, c_pink_cds, u_pink_cds = cds_vals nchan = self.nchan freq = self.pow_spec_dict['freq'] ps_all = self.pow_spec_dict['ps_all'] fig, axes = plt.subplots(1,2, figsize=(14,5)) ax = axes[0] # Amplifier averages x = freq y = np.mean(ps_all, axis=0) label='Amplifier Averaged' ax.loglog(x[1:], y[1:], marker='o', ms=0.25, ls='none', color='grey', label=label, rasterized=True) # White Noise yf = x**(0) cds_var = np.mean(rd_noise_cds**2) yf1 = len(yf) * cds_var * yf / yf.sum() ax.plot(x[1:], yf1[1:], ls='--', lw=1, label='White Noise') # Pink Noise per Channel yf = x**(-1); yf[0]=0 cds_var = np.mean(u_pink_cds**2) / np.sqrt(2) yf2 = len(yf) * cds_var * yf / yf.sum() ax.plot(x[1:], yf2[1:], ls='--', lw=1, label='Uncorr Pink Noise') # Correlated Pink Noise yresid = y - yf2 - yf1 scales = fit_corr_powspec(x, yresid) yf = broken_pink_powspec(x, scales) cds_var = c_pink_cds**2 / np.sqrt(2) yf3 = len(yf) * cds_var * yf / yf.sum() ax.plot(x[1:], yf3[1:], ls='--', lw=1, label='Corr Pink Noise') # Total of the three components yf_sum = (yf1 + yf2 + yf3) ax.plot(x[1:], yf_sum[1:], ls='--', lw=2, label='Sum') ax.set_ylabel('CDS Power (DN$^2$)') ax = axes[1] x = freq for ch in range(nchan): y = ps_all[ch] ax.loglog(x[1:], y[1:], marker='o', ms=0.25, ls='none', label=f'Ch{ch}', rasterized=True) for ax in axes: ax.set_xlim([5e-2, 7e4]) xloc = np.array(ax.get_xticks()) xlim = ax.get_xlim() xind = (xloc>=xlim[0]) & (xloc<=xlim[1]) ax.set_xlabel('Frequency (Hz)') ax.set_xlim(xlim) ax.set_ylim([10,1e7]) ax.legend(numpoints=3, markerscale=10) ax2 = ax.twiny() ax2.set_xlim(1/np.array(xlim)) ax2.set_xscale('log') ax2.set_xlabel('Time (sec)') # new_tick_locations = xloc[xind] # ax2.set_xticks(new_tick_locations) # ax2.set_xticklabels(tick_function(new_tick_locations)) ax.minorticks_on() fig.suptitle(f'Noise Power Spectrum (SCA {scaid})', fontsize=16) fig.tight_layout() fig.subplots_adjust(top=0.85) if save: fname = f'{scaid}_power_spectra.pdf' save_name = os.path.join(self.paths_dict['figdir'], fname) _log.info(f"Saving to {save_name}") fig.savefig(save_name, dpi=150) if return_figax: return fig, axes
[docs]class nircam_cal(nircam_dark): """ NIRCam Calibration class Assumes that all cal files exist in the calibration directory in PYNRC_PATH. """
[docs] def __init__(self, scaid, same_scan_direction=False, reverse_scan_direction=False, DMS=False, verbose=True): self.DMS = DMS # Directory information self.scaid = scaid caldir = os.path.join(conf.PYNRC_PATH, 'calib') + '/' self._create_dir_structure(None, caldir) prev_log = conf.logging_level if verbose: setup_logging('INFO', verbose=False) else: setup_logging('WARN', verbose=False) # Set up detector information self.det = DetectorOps(detector=scaid) self.det.same_scan_direction = same_scan_direction self.det.reverse_scan_direction = reverse_scan_direction hdr = self._grab_single_header() # Detector size try: nx, ny, nz = (hdr['SUBSIZE1'], hdr['SUBSIZE2'], hdr['NGROUPS']) except: nx = hdr['NAXIS1'] ny = hdr['NAXIS2'] nz = hdr['NGROUP'] self.det.multiaccum.ngroup = nz self.det.ypix = ny self.det.xpix = nx # Create masks for ref pixels, active pixels, and channels self._create_pixel_masks() self._init_attributes() # Dark ramp/slope info # Calculate dark slope image self.get_dark_slope_image() # Calculate pixel slope averages self.get_pixel_slope_averages() # Calculate CDS Noise for various component # white noise, 1/f noise (correlated and independent), temporal and spatial self.get_cds_dict() # Effective Noise self.get_effective_noise() # Get kTC reset noise, IPC, and PPC values self.get_ktc_noise() # Get the power spectrum information # Saved to pow_spec_dict['freq', 'ps_all', 'ps_corr', 'ps_ucorr'] self.get_power_spectrum(include_oh=False, calc_cds=True, mn_func=np.median, per_pixel=False) # Calculate IPC/PPC kernels self.get_ipc(calc_ppc=True) # Deconvolve the super dark and super bias images self.deconvolve_supers() # Get column variations self.get_column_variations() # Create dictionary of reference pixel behavior self.get_ref_pixel_noise() self.get_nonlinear_coeffs() try: self.get_linear_coeffs() except: _log.info('Skipping linearity coefficients. Not needed for simulations...') self.get_super_flats() setup_logging(prev_log, verbose=False)
####################################### # Open and return FITS info #######################################
[docs]def get_fits_data(fits_file, return_header=False, bias=None, reffix=False, DMS=False, int_ind=0, grp_ind=None, **kwargs): """ Read in FITS file data Parameters ========== fname : str FITS file (including path) to open. return_header : bool Return header as well as data? bias : ndarray If specified, will subtract bias image from ramp. reffix : bool Perform reference correction? DMS : bool Is the FITS file DMS format? int_ind : int If DMS format, select integration index to extract. DMS FITS files usually have all integrations within a given exposure in a single FITS extension, which can be quite large. grp_ind : 2-element array Option to index specific groups from the data. For instance `grp_ind=[0:10]` will select only the first 10 groups from the FITS cube. Keyword Args ============ altcol : bool Calculate separate reference values for even/odd columns. (default: True) supermean : bool Add back the overall mean of the reference pixels. (default: False) top_ref : bool Include top reference rows when correcting channel offsets. (default: True) bot_ref : bool Include bottom reference rows when correcting channel offsets. (default: True) ntop : int Specify the number of top reference rows. (default: 4) nbot : int Specify the number of bottom reference rows. (default: 4) mean_func : func Function used to calculate averages. (default: `robust.mean`) left_ref : bool Include left reference cols when correcting 1/f noise. (default: True) right_ref : bool Include right reference cols when correcting 1/f noise. (default: True) nleft : int Specify the number of left reference columns. (default: 4) nright : int Specify the number of right reference columns. (default: 4) perint : bool Smooth side reference pixel per integration, otherwise do frame-by-frame. (default: False) avg_type :str Type of side column averaging to perform to determine ref pixel drift. Allowed values are 'pixel', 'frame', or 'int' (default: 'frame'): * 'int' : Subtract the avg value of all side ref pixels in ramp. * 'frame' : For each frame, get avg of side ref pixels and subtract framewise. * 'pixel' : For each ref pixel, subtract its avg value from all frames. savgol : bool Use Savitsky-Golay filter method rather than FFT. (default: True) winsize : int Size of the window filter. (default: 31) order : int Order of the polynomial used to fit the samples. (default: 3) """ # Want to automatically determine if FITS files have DMS structure hdul = fits.open(fits_file) hdr = hdul[0].header if DMS: if int_ind > hdr['NINTS']-1: hdul.close() nint = hdr['NINTS'] raise ValueError(f'int_num must be less than {nint}.') data = hdul[1].data[int_ind] else: data = hdul[0].data # Select group indices if grp_ind is not None: data = data[grp_ind[0]:grp_ind[1]] # Convert to float data = data.astype(np.float) hdul.close() if bias is not None: data -= bias if reffix: data = reffix_hxrg(data, **kwargs) if return_header: return data, hdr else: return data
[docs]def ramp_resample(data, det_new, return_zero_frame=False): """ Resample a RAPID dataset into new detector format""" nz, ny, nx = data.shape # x1, y1 = (det_new.x0, det_new.y0) xpix, ypix = (det_new.xpix, det_new.ypix) # x2 = x1 + xpix # y2 = y1 + ypix # Do we need to crop out subarray? if ny==ypix: y1, y2 = (0, ny) else: # Will crop a subarray out of data y1 = det_new.y0 y2 = int(y1 + ypix) if nx==xpix: x1, x2 = (0, nx) else: # Will crop a subarray out of data x1 = det_new.x0 x2 = int(x1 + xpix) ma = det_new.multiaccum nd1 = ma.nd1 nd2 = ma.nd2 nf = ma.nf ngroup = ma.ngroup # Number of total frames up the ramp (including drops) # Keep last nd2 for reshaping nread_tot = nd1 + ngroup*nf + (ngroup-1)*nd2 assert nread_tot <= nz, f"Output ramp has more total read frames ({nread_tot}) than input ({nz})." # Crop dataset data_out = data[0:nread_tot, y1:y2, x1:x2] # Save the first frame (so-called ZERO frame) for the zero frame extension if return_zero_frame: zeroData = deepcopy(data_out[0]) # Remove drops and average grouped data if nf>1 or nd2>0: # Trailing drop frames were already excluded, so need to pull off last group of avg'ed frames data_end = data_out[-nf:,:,:].mean(axis=0) if nf>1 else data[-1:,:,:] data_end = data_end.reshape([1,ypix,xpix]) # Only care about first (n-1) groups for now # Last group is handled separately data_out = data_out[:-nf,:,:] # Reshape for easy group manipulation data_out = data_out.reshape([-1,nf+nd2,ypix,xpix]) # Trim off the dropped frames (nd2) if nd2>0: data_out = data_out[:,:nf,:,:] # Average the frames within groups # In reality, the 16-bit data is bit-shifted data_out = data_out.reshape([-1,ypix,xpix]) if nf==1 else data_out.mean(axis=1) # Add back the last group (already averaged) data_out = np.append(data_out, data_end, axis=0) if return_zero_frame: return data_out, zeroData else: return data_out
####################################### # Initial super bias function ####################################### def _wrap_super_bias_for_mp(arg): args, kwargs = arg fname = args[0] data, hdr = get_fits_data(fname, return_header=True, reffix=True, **kwargs) # Get header information and create a NIRCam detector timing instance det = create_detops(hdr, DMS=kwargs['DMS']) # Time array tarr = det.times_group_avg deg = kwargs['deg'] cf = jl_poly_fit(tarr, data, deg=deg) return cf[0]
[docs]def gen_super_bias(allfiles, DMS=False, mn_func=np.median, std_func=robust.std, return_std=False, deg=1, nsplit=3, **kwargs): """ Generate a Super Bias Image Read in a number of dark ramps, perform a polynomial fit to the data, and return the average of all bias offsets. This a very simple procedure that is useful for estimating an initial bias image. Will not work well for weird pixels. """ # Set logging to WARNING to suppress messages log_prev = conf.logging_level setup_logging('WARN', verbose=False) kw = kwargs.copy() kw['deg'] = deg kw['DMS'] = DMS if DMS: worker_args = [] for f in allfiles: hdr = fits.getheader(f) # Account for multiple ints in each file for i in range(hdr['NINTS']): kw['int_ind'] = i worker_args.append(([f],kw)) else: worker_args = [([f],kw) for f in allfiles] nfiles = len(allfiles) if nsplit>1: bias_all = [] # pool = mp.Pool(nsplit) try: with mp.Pool(nsplit) as pool: for res in tqdm(pool.imap_unordered(_wrap_super_bias_for_mp, worker_args), total=nfiles): bias_all.append(res) pool.close() # TODO: not sure if this is necessary? # bias_all = pool.map(_wrap_super_bias_for_mp, worker_args) if bias_all[0] is None: raise RuntimeError('Returned None values. Issue with multiprocess??') except Exception as e: _log.error('Caught an exception during multiprocess.') _log.error('Closing multiprocess pool.') pool.terminate() pool.close() raise e else: # Set back to previous logging level setup_logging(log_prev, verbose=False) _log.info('Closing multiprocess pool.') # pool.close() bias_all = np.array(bias_all) else: bias_all = np.array([_wrap_super_bias_for_mp(wa) for wa in tqdm(worker_args)]) # Set back to previous logging level setup_logging(log_prev, verbose=False) super_bias = mn_func(bias_all, axis=0) if return_std: _super_bias = std_func(bias_all,axis=0) return super_bias, _super_bias else: return super_bias
[docs]def chisqr_red(yvals, yfit=None, err=None, dof=None, err_func=np.std): """ Calculate reduced chi square metric If yfit is None, then yvals assumed to be residuals. In this case, `err` should be specified. Parameters ========== yvals : ndarray Sampled values. yfit : ndarray Model fit corresponding to `yvals`. dof : int Number of degrees of freedom (nvals - nparams - 1). err : ndarray or float Uncertainties associated with `yvals`. If not specified, then use yvals point-to-point differences to estimate a single value for the uncertainty. err_func : func Error function uses to estimate `err`. """ if (yfit is None) and (err is None): print("Both yfit and err cannot be set to None.") return diff = yvals if yfit is None else yvals - yfit sh_orig = diff.shape ndim = len(sh_orig) if ndim==1: if err is None: err = err_func(yvals[1:] - yvals[0:-1]) / np.sqrt(2) dev = diff / err chi_tot = np.sum(dev**2) dof = len(chi_tot) if dof is None else dof chi_red = chi_tot / dof return chi_red # Convert to 2D array if ndim==3: sh_new = [sh_orig[0], -1] diff = diff.reshape(sh_new) yvals = yvals.reshape(sh_new) # Calculate errors for each element if err is None: err_arr = np.array([yvals[i+1] - yvals[i] for i in range(sh_orig[0]-1)]) err = err_func(err_arr, axis=0) / np.sqrt(2) del err_arr else: err = err.reshape(diff.shape) # Get reduced chi sqr for each element dof = sh_orig[0] if dof is None else dof chi_red = np.sum((diff / err)**2, axis=0) / dof if ndim==3: chi_red = chi_red.reshape(sh_orig[-2:]) return chi_red
####################################### # Super dark with more advanced bias #######################################
[docs]def ramp_derivative(y, dx=None, fit0=True, deg=2, ifit=[0,10]): """ Get the frame-by-frame derivative of a ramp. Parameters ========== y : ndarray Array of values (1D, 2D or 3D) dx : float If dx is supplied, divide by value to get dy/dx. fit0 : bool In order to find slope of element 0, we have the option to fit some number of values to extrapolate this value. If not set, then dy0 = 2*dy[0] - dy[1]. ifit : 2-element array Indices to fit in order to extrapolate dy0. Don't necessarily want to fit the entire dataset. deg : int Polynomial degree to use for extrapolation fit. """ sh_orig = y.shape ndim = len(sh_orig) if ndim==1: dy = y[1:] - y[:-1] if fit0: xtemp = np.arange(len(dy))+1 lxmap = [np.min(xtemp), np.max(xtemp)] i1, i2 = ifit xfit = xtemp[i1:i2+1] dyfit = dy[i1:i2+1] # First try to fit log/log xfit_log = np.log10(xfit+1) dyfit_log = np.log10(dyfit) # if there are no NaNs, then fit to log scale if not np.isnan(dyfit_log.sum()): lxmap_log = np.log10(lxmap) cf = jl_poly_fit(xfit_log, dyfit_log, deg=deg, use_legendre=True, lxmap=lxmap_log) dy0_log = jl_poly(0, cf, use_legendre=True, lxmap=lxmap_log) dy0 = 10**dy0_log else: cf = jl_poly_fit(xtemp[i1:i2+1], dy[i1:i2+1], deg=deg, use_legendre=True, lxmap=lxmap) dy0 = jl_poly(0, cf, use_legendre=True, lxmap=lxmap) else: dy0 = 2*dy[0] - dy[1] dy = np.insert(dy, 0, dy0) if dx is not None: dy /= dx return dy # If fitting multiple pixels simultaneously # Convert to 2D array elif ndim==3: sh_new = [sh_orig[0], -1] y = y.reshape(sh_new) # Get differential dy = y[1:] - y[:-1] # Fit to slope to determine derivative of first element if fit0: xtemp = np.arange(len(dy))+1 lxmap = [np.min(xtemp), np.max(xtemp)] i1, i2 = ifit # Value on which to perform fit xfit = xtemp[i1:i2+1] dyfit = dy[i1:i2+1] # First try to fit in log/log space xfit_log = np.log10(xfit+1) dyfit_log = np.log10(dyfit) # Variable to hold first element of differential dy0 = np.zeros([dy.shape[-1]]) # Filter pixels that have valid values in logspace indnan = np.isnan(dyfit_log.sum(axis=0)) # Fit invalid values in linear space cf = jl_poly_fit(xfit, dyfit[:,indnan], deg=deg, use_legendre=True, lxmap=lxmap) dy0[indnan] = jl_poly(0, cf, use_legendre=True, lxmap=lxmap) # Fit non-NaN'ed data in logspace if len(indnan[~indnan])>0: lxmap_log = np.log10(lxmap) cf = jl_poly_fit(xfit_log, dyfit_log[:,~indnan], deg=deg, use_legendre=True, lxmap=lxmap_log) dy0_log = jl_poly(0, cf, use_legendre=True, lxmap=lxmap_log) dy0[~indnan] = 10**dy0_log else: dy0 = 2*dy[0] - dy[1] dy = np.insert(dy, 0, dy0, axis=0) if ndim==3: dy = dy.reshape(sh_orig) if dx is not None: dy /= dx return dy
[docs]def gen_super_dark(allfiles, super_bias=None, DMS=False, **kwargs): """ Average together all dark ramps to create a super dark ramp. First subtracts a bias frame. Tries to decipher t=0 intercept for odd behaving pixels. """ # Set logging to WARNING to suppress messages log_prev = conf.logging_level setup_logging('WARN', verbose=False) if super_bias is None: super_bias = 0 # Header info from first file hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=DMS) # nchan = det.nout nx = det.xpix ny = det.ypix nz = det.multiaccum.ngroup # chsize = det.chsize # tarr = np.arange(1, nz+1) * det.time_group tarr = det.times_group_avg # Active and reference pixel masks mask_ref = det.mask_ref mask_act = ~mask_ref # TODO: Better algorithms to find bad pixels # See Bad_pixel_changes.pdf from Karl masks_dict = { 'mask_ref': [], 'mask_poly': [], 'mask_deviant': [], 'mask_negative': [], 'mask_others': [] } bias_off_all = [] # Create a super dark ramp ramp_sum = np.zeros([nz,ny,nx]) ramp_sum2 = np.zeros([nz,ny,nx]) nsum = np.zeros([ny,nx]) nint_tot = 0 nfiles = len(allfiles) iter_files = tqdm(allfiles, desc='Files', leave=False) if nfiles>1 else allfiles for fname in iter_files: # If DMS, then might be multiple integrations per FITS file nint = fits.getheader(fname)['NINTS'] if DMS else 1 nint_tot += nint # Accounts for multiple ints FITS iter_range = trange(nint, desc='Ramps', leave=False) if nint>1 else range(nint) for i in iter_range: data = get_fits_data(fname, return_header=False, bias=super_bias, reffix=True, DMS=DMS, int_ind=i, **kwargs) # Fit everything with linear first deg = 1 cf_all = np.zeros([3,ny,nx]) cf_all[:2] = jl_poly_fit(tarr[1:], data[1:,:,:], deg=deg) yfit = jl_poly(tarr, cf_all) dof = data.shape[0] - deg # Get reduced chi-sqr metric chired_poly = chisqr_red(data, yfit=yfit, dof=dof) # Fit polynomial to those not well fit by linear func chi_cutoff = 2 ibad = ((chired_poly > chi_cutoff) | np.isnan(chired_poly)) & mask_act deg = 2 cf_all[:,ibad] = jl_poly_fit(tarr[1:], data[1:,ibad], deg=deg) yfit[:,ibad] = jl_poly(tarr, cf_all[:,ibad]) dof = data.shape[0] - deg # Get reduced chi-sqr metric for poorly fit data chired_poly[ibad] = chisqr_red(data[:,ibad], yfit=yfit[:,ibad], dof=dof) del yfit # Find pixels poorly fit by any polynomial ibad = ((chired_poly > chi_cutoff) | np.isnan(chired_poly)) & mask_act bias_off = cf_all[0] bias_off[ibad] = 0 # Those active pixels well fit by a polynomial mask_poly = (chired_poly <= chi_cutoff) & mask_act # Pixels with large deviations (5-sigma outliers) med_diff = np.median(cf_all[1])*tarr.max() std_diff = robust.std(cf_all[1])*tarr.max() mask_deviant = (data[-1] - data[1]) > (med_diff + std_diff*5) # Pixels with negative slopes mask_negative = (data[-1] - data[1]) < -(med_diff + std_diff*5) # Others mask_others = (~mask_poly) & (~mask_ref) & (~mask_deviant) & (~mask_negative) # Save to masks lists masks_dict['mask_poly'].append(mask_poly) # masks_dict['mask_ref'].append(mask_ref) masks_dict['mask_deviant'].append(mask_deviant) masks_dict['mask_negative'].append(mask_negative) masks_dict['mask_others'].append(mask_others) # Fit slopes of weird pixels to get their y=0 (bias) offset # ifit_others = mask_deviant | mask_others | mask_negative ifit_others = (~mask_poly) & mask_act yvals_fit = data[0:15,ifit_others] dy = ramp_derivative(yvals_fit, fit0=True, deg=1, ifit=[0,10]) yfit = np.cumsum(dy, axis=0) bias_off[ifit_others] = (yvals_fit[0] - yfit[0]) bias_off_all.append(bias_off) # Subtact bias data -= bias_off igood = mask_poly | mask_ref nsum[igood] += 1 for j, im in enumerate(data): ramp_sum[j,igood] += im[igood] ramp_sum2[j] += im del data, yfit # Take averages igood = (nsum >= 0.75*nint_tot) for im in ramp_sum: im[igood] /= nsum[igood] ramp_sum2 /= nint_tot # Replace empty ramp_sum pixels with ramp_sum2 # izero = np.sum(ramp_sum, axis=0) == 0 ramp_sum[:,~igood] = ramp_sum2[:,~igood] ramp_avg = ramp_sum # del ramp_sum2 # Get average of bias offsets bias_off_all = np.array(bias_off_all) bias_off_avg = robust.mean(bias_off_all, axis=0) # Convert masks to arrays for k in masks_dict.keys(): masks_dict[k] = np.array(masks_dict[k]) # Pixels with negative values mask_neg = (ramp_avg[0] < 0) & mask_act bias_off = np.zeros_like(bias_off_avg) yvals_fit = ramp_avg[:,mask_neg] dy = ramp_derivative(yvals_fit[0:15], fit0=True, deg=1, ifit=[0,10]) yfit = np.cumsum(dy, axis=0) bias_off[mask_neg] = (yvals_fit[0] - yfit[0]) # Pixels with largish positive values (indicative of RC pixels) mask_large = (ramp_avg[0] > 1000) | (ramp_avg[-1] > 50000) yvals_fit = ramp_avg[:,mask_large] dy = ramp_derivative(yvals_fit[0:15], fit0=True, deg=2, ifit=[0,10]) yfit = np.cumsum(dy, axis=0) bias_off[mask_large] = (yvals_fit[0] - yfit[0]) # Remove from ramp_avg and add into bias_off_avg ramp_avg -= bias_off bias_off_avg += bias_off # Pixels continuing to have largish positive values (indicative of RC pixels) bias_off = np.zeros_like(bias_off_avg) mask_large = ramp_avg[0] > 10000 yvals_fit = ramp_avg[:,mask_large] dy = ramp_derivative(yvals_fit[0:15], fit0=False) yfit = np.cumsum(dy, axis=0) bias_off[mask_large] += (yvals_fit[0] - yfit[0]) # Remove from ramp_avg and add into bias_off_avg ramp_avg -= bias_off bias_off_avg += bias_off setup_logging(log_prev, verbose=False) return ramp_avg, bias_off_avg, masks_dict
[docs]def gen_super_ramp(allfiles, super_bias=None, DMS=False, grp_max=None, sat_vals=None, **kwargs): """ Average together all linearity ramps to create a super ramp. Subtracts a bias frame to determine more appropriate pixel by pixel average. Tries to decipher t=0 intercept for odd behaving pixels. Also returns bias offsets. """ # Set logging to WARNING to suppress messages log_prev = conf.logging_level setup_logging('WARN', verbose=False) if super_bias is None: super_bias = 0 # Header info from first file hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=DMS) # nchan = det.nout nx = det.xpix ny = det.ypix nz = det.multiaccum.ngroup # chsize = det.chsize tarr = det.times_group_avg # Active and reference pixel masks mask_ref = det.mask_ref mask_act = ~mask_ref # TODO: Algorithms to find bad pixels # See Bad_pixel_changes.pdf from Karl if grp_max is None: grp_max = find_group_sat(allfiles[-1], DMS=DMS, bias=super_bias, sat_vals=None, sat_calc=0.998) grp_max = grp_max + 10 grp_ind = [0,nz] if grp_max>nz else [0,grp_max] # Update number of read frames nz = grp_ind[1] det.multiaccum.ngroup = nz tarr = det.times_group_avg # Create a super dark ramp ramp_sum = np.zeros([nz,ny,nx]) bias_off_all = [] nint_tot = np.zeros([nz]) nfiles = len(allfiles) iter_files = tqdm(allfiles, desc='Super Ramp', leave=False) if nfiles>1 else allfiles for fname in iter_files: # If DMS, then might be multiple integrations per FITS file nint = fits.getheader(fname)['NINTS'] if DMS else 1 # nint_tot += nint # Accounts for multiple ints FITS iter_range = trange(nint, desc='Ramps', leave=False) if nint>1 else range(nint) for i in iter_range: data = get_fits_data(fname, DMS=DMS, return_header=False, bias=super_bias, reffix=True, int_ind=i, grp_ind=grp_ind, **kwargs) # Saturation levels svals = find_sat(data, ref_info=det.ref_info) if sat_vals is None else sat_vals # Fit polynomial data at <50% well to find bias offset deg = 2 cf_all = cube_fit(tarr, data, sat_vals=svals, sat_frac=0.50, deg=deg, ref_info=det.ref_info) bias_off = cf_all[0] bias_off_all.append(bias_off) data -= bias_off for j, im in enumerate(data): ramp_sum[j] += im # Increment total frames here # Catches data where ramp is truncated (incomplete data) nint_tot[j] += 1 del data # Take averages ramp_sum /= nint_tot.reshape([-1,1,1]) ramp_avg = ramp_sum # Get average of bias offsets bias_off_all = np.array(bias_off_all) bias_off_avg = robust.mean(bias_off_all, axis=0) # Pixels with negative values mask_neg = (ramp_avg[10] < 0) & mask_act bias_off = np.zeros_like(bias_off_avg) yvals_fit = ramp_avg[:,mask_neg] dy = ramp_derivative(yvals_fit[0:15], fit0=True, deg=1, ifit=[0,10]) yfit = np.cumsum(dy, axis=0) bias_off[mask_neg] = (yvals_fit[0] - yfit[0]) # Remove from ramp_avg and add into bias_off_avg ramp_avg -= bias_off bias_off_avg += bias_off setup_logging(log_prev, verbose=False) return ramp_avg, bias_off_avg
[docs]def plot_dark_histogram(im, ax, binsize=0.0001, return_ax=False, label='Active Pixels', plot_fit=True, plot_cumsum=True, color='C1', xlim=None, xlim_std=7): from astropy.modeling import models, fitting bins = np.arange(im.min(), im.max() + binsize, binsize) ig, vg, cv = hist_indices(im, bins=bins, return_more=True) # Number of pixels in each bin nvals = np.array([len(i) for i in ig]) # Fit a Gaussian to get peak of dark current ind_nvals_max = np.where(nvals==nvals.max())[0][0] mn_init = cv[ind_nvals_max] std_init = robust.std(im) g_init = models.Gaussian1D(amplitude=nvals.max(), mean=mn_init, stddev=std_init) fit_g = fitting.LevMarLSQFitter() nvals_norm = nvals / nvals.max() ind_fit = (cv>mn_init-1*std_init) & (cv<mn_init+1*std_init) g_res = fit_g(g_init, cv[ind_fit], nvals_norm[ind_fit]) bg_max_dn = g_res.mean.value bg_max_npix = g_res.amplitude.value ax.plot(cv, nvals_norm, label=label, lw=2) if plot_fit: ax.plot(cv, g_res(cv), label='Gaussian Fit', lw=1.5, color=color) label = 'Peak = {:.4f} DN/sec'.format(bg_max_dn) ax.plot(2*[bg_max_dn], [0,bg_max_npix], label=label, ls='--', lw=1, color=color) if plot_cumsum: ax.plot(cv, np.cumsum(nvals) / im.size, color='C3', lw=1, label='Cumulative Sum') ax.set_ylabel('Relative Number of Pixels') ax.set_title('All Active Pixels') if xlim is None: xlim = np.array([-1,1]) * xlim_std * g_res.stddev.value + bg_max_dn xlim[0] = np.min([0,xlim[0]]) ax.set_xlabel('Dark Rate (DN/sec)') ax.set_xlim(xlim)#[0,2*bg_max_dn]) ax.legend(loc='upper left') if return_ax: return ax
####################################### # Column variations #######################################
[docs]def gen_col_variations(allfiles, super_bias=None, super_dark_ramp=None, DMS=False, **kwargs): """ Create a series of column offset models Returns a series of ramp variations to add to entire columns as well as the probability a given column will be affected. Likely due to FETS in the ASIC preamp or ADC or detector column buffer jumping around and causing entire columns within a ramp to transition between two states. """ # Set logging to WARNING to suppress messages log_prev = conf.logging_level setup_logging('WARN', verbose=False) hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=DMS) nchan = det.nout nx = det.xpix # ny = det.ypix # nz = det.multiaccum.ngroup chsize = det.chsize if super_dark_ramp is None: super_dark_ramp = 0 if super_bias is None: super_bias = 0 ramp_column_varations = [] nbad = [] for f in tqdm(allfiles, desc='Files'): # If DMS, then might be multiple integrations per FITS file nint = fits.getheader(f)['NINTS'] if DMS else 1 iter_range = trange(nint, desc='Ramps', leave=False) if nint>1 else range(nint) for i in iter_range: # Subtract bias, but don't yet perform reffix data = get_fits_data(f, bias=super_bias, DMS=DMS, int_ind=i) # Subtract super_dark_ramp to get residuals data -= super_dark_ramp data = reffix_hxrg(data, **kwargs) # Take the median of each column data_ymed = np.median(data, axis=1) # Set each ramp residual to 0 offset data_ymed -= np.median(data_ymed, axis=0) # Get rid of residual channel offsets for ch in range(nchan): x1 = ch*chsize x2 = x1 + chsize data_ymed[:,x1:x2] -= np.median(data_ymed[:,x1:x2], axis=1).reshape([-1,1]) del data # Get derivatives # dymed = ramp_derivative(data_ymed, fit0=False) # Determine which columns have large excursions ymed_avg = np.mean(data_ymed, axis=0) ymed_std = np.std(data_ymed, axis=0) # dymed_avg = np.mean(dymed, axis=0) # dymed_std = np.std(dymed, axis=0) # print(np.median(ymed_avg), np.median(ymed_std)) # print(robust.std(ymed_avg), robust.std(ymed_std)) # Mask of outliers mask_outliers1 = np.abs(ymed_avg) > np.median(ymed_avg) + 1*robust.std(ymed_avg) mask_outliers2 = ymed_std > np.median(ymed_std) + 1*robust.std(ymed_std) mask_outliers = mask_outliers1 | mask_outliers2 mask_outliers[:4] = False mask_outliers[-4:] = False data_ymed_outliers = data_ymed[:,mask_outliers] # dymed_outliers = dymed[:,mask_outliers] # data_ymed_good = data_ymed[:,~mask_outliers] # dymed_good = dymed[:,~mask_outliers] ramp_column_varations.append(data_ymed_outliers) nbad.append(data_ymed_outliers.shape[1]) ramp_column_varations = np.hstack(ramp_column_varations) nbad = np.array(nbad) prob_bad = np.mean(nbad/nx) setup_logging(log_prev, verbose=False) return ramp_column_varations, prob_bad
####################################### # Reference pixel information ####################################### # Main reference bias offsets # Amplifier bias offsets
[docs]def get_bias_offsets(data, nchan=4, ref_bot=True, ref_top=True, npix_ref=4): """ Get Reference Bias Characteristics Given some ramp data, determine the average master bias offset as well as the relative individual amplifier offsets. Also return the frame-to-frame variations caused by the preamp resets. """ if ref_bot==False and ref_top==False: print('Need top and/or bottom refernece to be True') return nz, ny, nx = data.shape chsize = int(nx/nchan) # Mask of top and/and bottom reference pixels mask_ref = np.zeros([ny,nx]).astype('bool') mask_ref[0:npix_ref,:] = ref_bot mask_ref[-npix_ref:,:] = ref_top # Reference offsets for each frame bias_off_frame = np.median(data[:,mask_ref], axis=1) bias_mn = np.mean(bias_off_frame) bias_std_f2f = robust.std(bias_off_frame) # Remove average bias offsets from each frame for i, im in enumerate(data): im -= bias_off_frame[i] # Determine amplifier offsets amp_mn_all = [] amp_std_f2f_all = [] for ch in range(nchan): mask_ch = np.zeros([ny,nx]).astype('bool') mask_ch[:,ch*chsize:(ch+1)*chsize] = True mask_ch_pix = mask_ref & mask_ch # Reference pixel offsets for this amplifier data_ch = data[:,mask_ch_pix] amp_off_frame = np.median(data_ch, axis=1) amp_mn = np.mean(amp_off_frame) amp_std_f2f = robust.std(amp_off_frame) amp_mn_all.append(amp_mn) amp_std_f2f_all.append(amp_std_f2f) amp_mn_all = np.array(amp_mn_all) amp_std_f2f_all = np.array(amp_std_f2f_all) return bias_mn, bias_std_f2f, amp_mn_all, amp_std_f2f_all
[docs]def get_oddeven_offsets(data, nchan=4, ref_bot=True, ref_top=True, bias_off=None, amp_off=None): """ Even/Odd Column Offsets Return the per-amplifier offsets of the even and odd columns relative after subtraction of the matster and amplifier bias offsets. """ if bias_off is None: bias_off = 0 if amp_off is None: amp_off = np.zeros(nchan) nz, ny, nx = data.shape chsize = int(nx / nchan) mask_ref_even = np.zeros([ny,nx]).astype('bool') mask_ref_even[0:4,0::2] = ref_bot mask_ref_even[-4:,0::2] = ref_top mask_ref_odd = np.zeros([ny,nx]).astype('bool') mask_ref_odd[0:4,1::2] = ref_bot mask_ref_odd[-4:,1::2] = ref_top ch_odd_vals_ref = [] ch_even_vals_ref = [] for ch in range(nchan): # Reference pixels mask_ch = np.zeros([ny,nx]).astype('bool') mask_ch[:,ch*chsize:(ch+1)*chsize] = True mask_even_ch = mask_ch & mask_ref_even mask_odd_ch = mask_ch & mask_ref_odd data_ref_even = data[:,mask_even_ch] data_ref_odd = data[:,mask_odd_ch] data_ref_even_offset = np.mean(data_ref_even) - bias_off - amp_off[ch] data_ref_odd_offset = np.mean(data_ref_odd) - bias_off - amp_off[ch] ch_odd_vals_ref.append(data_ref_odd_offset) ch_even_vals_ref.append(data_ref_even_offset) ch_odd_vals_ref = np.array(ch_odd_vals_ref) ch_even_vals_ref = np.array(ch_even_vals_ref) return ch_even_vals_ref, ch_odd_vals_ref
[docs]def get_ref_instability(data, nchan=4, ref_bot=True, ref_top=True, mn_func=np.median): """ Reference Pixel Instability Determine the instability of the average reference pixel values relative to the active pixels on a frame-to-frame basis. The procedure is to compute a series of CDS frames, then look at the peak distributions of the active pixels relative to the reference pixels. """ cds = data[1:] - data[:-1] nz, ny, nx = data.shape chsize = int(nx / nchan) # Mask of active pixels mask_act = np.zeros([ny,nx]).astype('bool') mask_act[4:-4,4:-4] = True # Mask of top and bottom reference pixels mask_ref = np.zeros([ny,nx]).astype('bool') mask_ref[0:4,:] = ref_bot mask_ref[-4:,:] = ref_top ref_inst = [] for ch in range(nchan): mask_ch = np.zeros([ny,nx]).astype('bool') mask_ch[:,ch*chsize:(ch+1)*chsize] = True cds_ref = cds[:,mask_ref & mask_ch] cds_act = cds[:,mask_act & mask_ch] cds_ref_mn = mn_func(cds_ref, axis=1) cds_act_mn = mn_func(cds_act, axis=1) # Relative to Reference cds_act_mn -= cds_ref_mn ref_inst.append(np.std(cds_act_mn) / np.sqrt(2)) ref_inst = np.array(ref_inst) return ref_inst
[docs]def gen_ref_dict(allfiles, super_bias, super_dark_ramp=None, DMS=False, **kwargs): """ Generate Reference Pixel Behavior Dictionary """ if super_dark_ramp is None: super_dark_ramp = 0 # Header info from first file hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=DMS) nchan = det.nout bias_mn_ref_all = [] # Main bias average offset bias_std_f2f_ref_all = [] # Main bias standard deviation per int amp_mn_ref_all = [] # Amplifier ref offset per integration amp_std_f2f_ref_all = [] # Ampl Ref frame-to-frame variations # Even/Odd Column Offsets col_even_offset_ref = [] col_odd_offset_ref = [] # Ref Instability frame-to-frame amp_std_ref_act_all = [] for fname in tqdm(allfiles, desc='Files'): # If DMS, then might be multiple integrations per FITS file nint = fits.getheader(fname)['NINTS'] if DMS else 1 iter_range = trange(nint, desc='Ramps', leave=False) if nint>1 else range(nint) for i in iter_range: # Relative to super bias and super dark ramp data = get_fits_data(fname, bias=super_bias, DMS=DMS, int_ind=i) data -= super_dark_ramp # Get master and amplifer offsets res = get_bias_offsets(data, nchan=nchan) bias_mn_ref_all.append(res[0]) bias_std_f2f_ref_all.append(res[1]) amp_mn_ref_all.append(res[2]) amp_std_f2f_ref_all.append(res[3]) # bias_off was subtracted in-place from data within get_bias_offsets() res_col = get_oddeven_offsets(data, nchan=nchan, bias_off=0, amp_off=res[2]) col_even_offset_ref.append(res_col[0]) col_odd_offset_ref.append(res_col[1]) # Reference pixel instabilities data = reffix_hxrg(data, **kwargs) ref_inst = get_ref_instability(data, nchan=nchan) amp_std_ref_act_all.append(ref_inst) del data bias_mn_ref_all = np.array(bias_mn_ref_all) bias_std_f2f_ref_all = np.array(bias_std_f2f_ref_all) amp_mn_ref_all = np.array(amp_mn_ref_all) amp_std_f2f_ref_all = np.array(amp_std_f2f_ref_all) col_even_offset_ref = np.array(col_even_offset_ref) col_odd_offset_ref = np.array(col_odd_offset_ref) amp_std_ref_act_all = np.array(amp_std_ref_act_all) ref_dict = {} # Master bias offsets ref_dict['master_bias_mean'] = bias_mn_ref_all.mean() ref_dict['master_bias_std'] = robust.medabsdev(bias_mn_ref_all) ref_dict['master_bias_f2f'] = np.sqrt(np.mean(bias_std_f2f_ref_all**2)) # Amplifier Offsets ref_dict['amp_offset_mean'] = amp_mn_ref_all.mean(axis=0) # There can be correlations between offsets that depend on temperature # Let's remove those to get the true standard deviation cf = jl_poly_fit(bias_mn_ref_all, amp_mn_ref_all) amp_sub = amp_mn_ref_all - jl_poly(bias_mn_ref_all, cf) ref_dict['amp_offset_std'] = robust.std(amp_sub, axis=0) ref_dict['amp_offset_f2f'] = np.sqrt(np.mean(amp_std_f2f_ref_all**2, axis=0)) # Correlation between master_bias_mean and amp_offset_mean ref_dict['master_amp_cf'] = cf # Even/Odd Column offsets ref_dict['amp_even_col_offset'] = (np.mean(col_even_offset_ref, axis=0)) ref_dict['amp_odd_col_offset'] = (np.mean(col_odd_offset_ref, axis=0)) # Reference instability relative active pixels ref_dict['amp_ref_inst_f2f'] = np.sqrt(np.mean(amp_std_ref_act_all**2, axis=0)) _log.info("Reference Pixels") _log.info('') _log.info("Master Bias Mean") _log.info(ref_dict['master_bias_mean']) _log.info("Master Bias StDev") _log.info(ref_dict['master_bias_std']) _log.info("Master Bias Frame-to-Frame StDev") _log.info(ref_dict['master_bias_f2f']) _log.info('') _log.info("Amp Offset Mean") _log.info(ref_dict['amp_offset_mean']) _log.info("Amp Offset StDev") _log.info(ref_dict['amp_offset_std']) _log.info("Amp Offset Frame-to-Frame StDev") _log.info(ref_dict['amp_offset_f2f']) _log.info("") _log.info("Even Columns Offset") _log.info(ref_dict['amp_even_col_offset']) _log.info("Odd Columns Offset") _log.info(ref_dict['amp_odd_col_offset']) _log.info("") _log.info("Reference Instability") _log.info(ref_dict['amp_ref_inst_f2f']) return ref_dict
####################################### # Detector Noise #######################################
[docs]def calc_ktc(bias_sigma_arr, binsize=0.25, return_std=False): """ Calculate kTC (Reset) Noise Use the uncertainty image from super bias to calculate the kTC noise. This function generates a histogram of the pixel uncertainties and takes the peak of the distribution as the pixel reset noise. Parameters ---------- bias_sigma_arr : ndarray Image of the pixel uncertainties. binsize : float Size of the histogram bins. return_std : bool Also return the standard deviation of the distribution? """ im = bias_sigma_arr binsize = binsize bins = np.arange(im.min(), im.max() + binsize, binsize) ig, vg, cv = hist_indices(im, bins=bins, return_more=True) nvals = np.array([len(i) for i in ig]) # nvals_rel = nvals / nvals.max() # Peak of distribution ind_peak = np.where(nvals==nvals.max())[0][0] peak = cv[ind_peak] if return_std: return peak, robust.medabsdev(im) else: return peak
[docs]def calc_cdsnoise(data, temporal=True, spatial=True, std_func=np.std): """ Calculate CDS noise from input image cube""" if (temporal==False) and (spatial==False): _log.warn("Must select one or both of `temporal` or `spatial`") return # Make sure we select same number of even/odd frame vals1 = data[0::2] vals2 = data[1::2] nz1 = vals1.shape[0] nz2 = vals2.shape[0] nz = np.min([nz1,nz2]) # Calculate CDS image pairs cds_arr = vals2[:nz] - vals1[:nz] # CDS noise per pixel (temporal) if temporal: cds_temp = std_func(cds_arr, axis=0) # Take median of the variance cds_temp_med = np.sqrt(np.median(cds_temp**2)) # CDS noise per frame (spatial) if spatial: sh = cds_arr.shape cds_spat = std_func(cds_arr.reshape([sh[0],-1]), axis=1) # Take median of the variance cds_spat_med = np.sqrt(np.median(cds_spat**2)) if temporal and spatial: res = cds_temp_med, cds_spat_med elif temporal: res = cds_temp_med elif spatial: res = cds_spat_med return res
[docs]def gen_cds_dict(allfiles, DMS=False, superbias=None, mask_good_arr=None, same_scan_direction=False): """ Generate dictionary of CDS noise info Calculate read noise for: 1. Total noise (no column correcton) 2. 1/f noise (no column correcton) 3. Intrinsic read noise (w/ column correcton) 4. Both temporal and spatial """ # Header info from first file hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=DMS) nchan = det.nout nx = det.xpix ny = det.ypix nz = det.multiaccum.ngroup chsize = det.chsize cds_act_dict = { 'spat_tot': [], 'spat_det': [], 'temp_tot': [], 'temp_det': [], 'spat_pink_corr': [], 'spat_pink_uncorr': [], 'temp_pink_corr': [], 'temp_pink_uncorr': [], } cds_ref_dict = { 'spat_tot': [], 'spat_det': [], 'temp_tot': [], 'temp_det': [], } # Active and reference pixel masks lower, upper, left, right = det.ref_info # Reference pixel mask # Just use top and bottom ref pixel mask_ref = np.zeros([ny,nx], dtype='bool') if lower>0: mask_ref[0:lower,:] = True if upper>0: mask_ref[-upper:,:] = True # Active pixels mask mask_act = np.zeros([ny,nx], dtype='bool') mask_act[lower:-upper,left:-right] = True # Channel mask mask_channels = det.mask_channels # mask_channels = np.zeros([ny,nx]) # for ch in range(nchan): # mask_channels[:,ch*chsize:(ch+1)*chsize] = ch # Mask of good pixels if mask_good_arr is None: mask_good_arr = np.ones([nz,ny,nx], dtype='bool') kwargs = { 'nchans': nchan, 'altcol': True, 'in_place': True, 'fixcol': False, 'avg_type': 'pixel', 'savgol': True, 'perint': False } for fname in tqdm(allfiles, desc='Files'): # If DMS, then might be multiple integrations per FITS file nint = fits.getheader(fname)['NINTS'] if DMS else 1 iter_range = trange(nint, desc='Ramps', leave=False) if nint>1 else range(nint) for i in iter_range: # Relative to super bias and super dark ramp data = get_fits_data(fname, bias=superbias, DMS=DMS, int_ind=i) ################################## # 1. Full noise (det + 1/f) kwargs['fixcol'] = False data = get_fits_data(fname, bias=superbias, reffix=True, DMS=DMS, int_ind=i, **kwargs) # Active pixels in each channel cds_temp_arr = [] cds_spat_arr = [] indgood = (mask_good_arr[i]) & mask_act for ch in np.arange(nchan): ind = indgood & (mask_channels == ch) cds_temp, cds_spat = calc_cdsnoise(data[:,ind]) cds_temp_arr.append(cds_temp) cds_spat_arr.append(cds_spat) cds_act_dict['temp_tot'].append(cds_temp_arr) cds_act_dict['spat_tot'].append(cds_spat_arr) # Reference pixels in each channel cds_temp_arr = [] cds_spat_arr = [] indgood = mask_ref for ch in np.arange(nchan): ind = indgood & (mask_channels == ch) cds_temp, cds_spat = calc_cdsnoise(data[:,ind]) cds_temp_arr.append(cds_temp) cds_spat_arr.append(cds_spat) cds_ref_dict['temp_tot'].append(cds_temp_arr) cds_ref_dict['spat_tot'].append(cds_spat_arr) ################################## # 2. 1/f noise contributions # Create array of extracted 1/f noise # Work on CDS pairs fn_data = [] cds_data = data[1:20:2] - data[0:20:2] for im in cds_data: ch_arr = im.reshape([ny,-1,chsize]).transpose([1,0,2]) mask = np.abs(im - np.median(im)) > 10*robust.medabsdev(im) mask = mask.reshape([ny,-1,chsize]).transpose([1,0,2]) fnoise = channel_smooth_savgol(ch_arr, mask=mask) fnoise = fnoise.transpose([1,0,2]).reshape([ny,nx]) fn_data.append(fnoise) fn_data = np.array(fn_data) # Divide by sqrt(2) since we've already performed a CDS difference fn_data /= np.sqrt(2) # Split into correlated and uncorrelated components fn_data_corr = [] for j, im in enumerate(fn_data): fn_corr = channel_averaging(im, nchans=nchan, off_chans=False, same_scan_direction=same_scan_direction, mn_func=np.mean) # Subtract from fn_data fn_data[j] -= fn_corr # Only append first channel since the rest are the same data fn_data_corr.append(fn_corr[:,0:chsize]) fn_data_corr = np.array(fn_data_corr) # Active pixels noise in each channel for uncorrelated data cds_temp_arr = [] cds_spat_arr = [] indgood = (mask_good_arr[i]) & mask_act for ch in np.arange(nchan): ind = indgood & (mask_channels == ch) cds_temp, cds_spat = calc_cdsnoise(fn_data[:,ind]) cds_temp_arr.append(cds_temp) cds_spat_arr.append(cds_spat) cds_act_dict['temp_pink_uncorr'].append(cds_temp_arr) cds_act_dict['spat_pink_uncorr'].append(cds_spat_arr) del fn_data # Active pixels noise in correlated channel data indgood = (mask_good_arr[i]) & mask_act ind = indgood[:,0:chsize] cds_temp, cds_spat = calc_cdsnoise(fn_data_corr[:,ind]) cds_act_dict['temp_pink_corr'].append(cds_temp) cds_act_dict['spat_pink_corr'].append(cds_spat) del fn_data_corr ################################## # 3. Detector contributions kwargs['fixcol'] = True data = reffix_hxrg(data, **kwargs) # New 1/f noise array for j, im in enumerate(data): ch_arr = im.reshape([ny,-1,chsize]).transpose([1,0,2]) fnoise = channel_smooth_savgol(ch_arr) fnoise = fnoise.transpose([1,0,2]).reshape([ny,nx]) # Remove 1/f noise contributions data[j] -= fnoise # Active pixels in each channel cds_temp_arr = [] cds_spat_arr = [] indgood = (mask_good_arr[i]) & mask_act for ch in np.arange(nchan): ind = indgood & (mask_channels == ch) cds_temp, cds_spat = calc_cdsnoise(data[:,ind]) cds_temp_arr.append(cds_temp) cds_spat_arr.append(cds_spat) cds_act_dict['temp_det'].append(cds_temp_arr) cds_act_dict['spat_det'].append(cds_spat_arr) # Reference pixels in each channel cds_temp_arr = [] cds_spat_arr = [] indgood = mask_ref for ch in np.arange(nchan): ind = indgood & (mask_channels == ch) cds_temp, cds_spat = calc_cdsnoise(data[:,ind]) cds_temp_arr.append(cds_temp) cds_spat_arr.append(cds_spat) cds_ref_dict['temp_det'].append(cds_temp_arr) cds_ref_dict['spat_det'].append(cds_spat_arr) # Done with data del data # Convert lists to np.array dlist = [cds_act_dict, cds_ref_dict] for d in dlist: for k in d.keys(): if isinstance(d[k], (list)): d[k] = np.array(d[k]) return cds_act_dict, cds_ref_dict
[docs]def calc_eff_noise(allfiles, superbias=None, temporal=True, spatial=True, ng_all=None, DMS=False, kw_ref=None, std_func=robust.medabsdev, kernel_ipc=None, kernel_ppc=None, read_pattern='RAPID'): """ Determine Effective Noise Calculates the slope noise (in DN/sec) assuming a linear fits to a variety number of groups. The idea is to visualize the reduction in noise as you increase the number of groups in the fit and compare it to theoretical predictions (ie., slope noise formula). Parameters ---------- allfiles : list List of input file names. DMS : bool Are files DMS formatted? superbias : ndarray Super bias to subtract from each dataset. temporal : bool Calculate slope noise using pixels' temporal distribution? spatial : bool Calcualte slope noise using pixel spatial distribution? ng_all : array-like Array of group to perform linear fits for slope calculations. kw_ref : dict Dictionary of keywords to pass to reference correction routine. std_func : func Function for calculating spatial distribution. kernel_ipc : ndarray IPC kernel to perform deconvolution on slope images. kernel_ppc : ndarray Similar to `kernel_ipc` except for PPC. read_pattern : string Reformulate data as if it were acquired using a read pattern other than RAPID. """ log_prev = conf.logging_level setup_logging('WARN', verbose=False) hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=DMS) nchan = det.nout nx = det.xpix ny = det.ypix chsize = det.chsize # Masks for active, reference, and amplifiers ref_mask = det.mask_ref act_mask = ~ref_mask ch_mask = det.mask_channels if 'RAPID' not in read_pattern: det_new = deepcopy(det) ma_new = det_new.multiaccum # Change read mode and determine max number of allowed groups ma_new.read_mode = read_pattern ma_new.ngroup = int((det.multiaccum.ngroup - ma_new.nd1 + ma_new.nd2) / (ma_new.nf + ma_new.nd2)) nz = ma_new.ngroup # Group time # tarr = np.arange(1, nz+1) * det_new.time_group + (ma_new.nd1 - ma_new.nd2 - ma_new.nf/2)*det_new.time_frame tarr = det_new.times_group_avg # Select number of groups to perform linear fits if ng_all is None: if nz<20: ng_all = np.arange(2,nz+1).astype('int') else: ng_all = np.append([2,3], np.linspace(5,nz,num=16).astype('int')) else: nz = det.multiaccum.ngroup # Group time tarr = np.arange(1, nz+1) * det.time_group # Select number of groups to perform linear fits if ng_all is None: ng_all = np.append([2,3,5], np.linspace(10,nz,num=15).astype('int')) # Make sure ng_all is unique ng_all = np.unique(ng_all) # Do not remove 1/f noise via ref column if kw_ref is None: kw_ref = { 'nchans': nchan, 'altcol': True, 'in_place': True, 'fixcol': False, 'avg_type': 'pixel', 'savgol': True, 'perint': False } # IPC and PPC kernels if kernel_ipc is not None: ipc_big = pad_or_cut_to_size(kernel_ipc, (ny,nx)) kipc_fft = np.fft.fft2(ipc_big) else: kipc_fft = None if kernel_ppc is not None: ppc_big = pad_or_cut_to_size(kernel_ppc, (ny,chsize)) kppc_fft = np.fft.fft2(ppc_big) else: kppc_fft = None # Calculate effective noise temporally if temporal: eff_noise_temp = [] # Work with one channel at a time for better memory management for ch in trange(nchan, desc="Temporal", leave=False): ind_ch = act_mask & (ch_mask==ch) slope_chan_allfiles = [] slope_ref_allfiles = [] for fname in tqdm(allfiles, leave=False, desc="Files"): # If DMS, then might be multiple integrations per FITS file nint = fits.getheader(fname)['NINTS'] if DMS else 1 iter_range = trange(nint, desc='Ramps', leave=False) if nint>1 else range(nint) for i in iter_range: data = get_fits_data(fname, bias=superbias, reffix=True, DMS=DMS, int_ind=i, **kw_ref) # Reformat data? if 'RAPID' not in read_pattern: data = ramp_resample(data, det_new) slope_chan = [] slope_ref = [] for fnum in tqdm(ng_all, leave=False, desc="Group Fit"): bias, slope = jl_poly_fit(tarr[0:fnum], data[0:fnum]) # Deconvolve fits to remove IPC and PPC if kipc_fft is not None: slope = ipc_deconvolve(slope, None, kfft=kipc_fft) if kppc_fft is not None: slope = ppc_deconvolve(slope, None, kfft=kppc_fft, in_place=True) slope_chan.append(slope[ind_ch]) # Do reference pixels if ch==nchan-1: slope_ref.append(slope[ref_mask]) slope_chan_allfiles.append(np.array(slope_chan)) if ch==nchan-1: slope_ref_allfiles.append(np.array(slope_ref)) del data slope_chan_allfiles = np.array(slope_chan_allfiles) # Reference pixels if ch==nchan-1: slope_ref_allfiles = np.array(slope_ref_allfiles) # Calculate std dev for each pixels std_pix = np.std(slope_chan_allfiles, axis=0) # Get the median of the variance distribution eff_noise = np.sqrt(np.median(std_pix**2, axis=1)) eff_noise_temp.append(eff_noise) if ch==nchan-1: std_pix = np.std(slope_ref_allfiles, axis=0) eff_noise_ref = np.sqrt(np.median(std_pix**2, axis=1)) eff_noise_temp.append(eff_noise_ref) del slope_chan, slope_chan_allfiles, std_pix eff_noise_temp = np.array(eff_noise_temp) # Calculate effective noise spatially if spatial: eff_noise_all = [] for f in tqdm(allfiles, desc="Spatial", leave=False): # If DMS, then might be multiple integrations per FITS file nint = fits.getheader(fname)['NINTS'] if DMS else 1 iter_range = trange(nint, desc='Ramps', leave=False) if nint>1 else range(nint) for i in iter_range: data = get_fits_data(f, bias=superbias, reffix=True, DMS=DMS, ind_int=i, **kw_ref) # Reformat data? if 'RAPID' not in read_pattern: data = ramp_resample(data, det_new) eff_noise_chans = [] # Spatial standard deviation for fnum in tqdm(ng_all, leave=False, desc="Group Fit"): bias, slope = jl_poly_fit(tarr[0:fnum], data[0:fnum]) # Deconvolve fits to remove IPC and PPC if kipc_fft is not None: slope = ipc_deconvolve(slope, None, kfft=kipc_fft) if kppc_fft is not None: slope = ppc_deconvolve(slope, None, kfft=kppc_fft, in_place=True) eff_noise = [] # Each channel for ch in np.arange(nchan): ind_ch = act_mask & (ch_mask==ch) eff_noise.append(std_func(slope[ind_ch])) # Add reference pixels eff_noise.append(std_func(slope[ref_mask])) # Append to final array eff_noise_chans.append(np.array(eff_noise)) eff_noise_chans = np.array(eff_noise_chans).transpose() eff_noise_all.append(eff_noise_chans) del data eff_noise_all = np.array(eff_noise_all) eff_noise_spat = np.median(eff_noise_all, axis=0) setup_logging(log_prev, verbose=False) if temporal and spatial: res = ng_all, eff_noise_temp, eff_noise_spat elif temporal: res = ng_all, eff_noise_temp elif spatial: res = ng_all, eff_noise_spat return res
[docs]def fit_func_var_ex(params, det, patterns, ng_all_list, en_dn_list, read_noise=None, idark=None, ideal_Poisson=False): """Function for lsq fit to get excess variance""" gain = det.gain if idark is None: idark = det.dark_current # Read noise per frame if read_noise is None: cds_var = (en_dn_list[0][0] * det.time_group * gain)**2 - (idark * det.time_frame) read_noise = np.sqrt(cds_var / 2) diff_all = [] for i, patt in enumerate(patterns): det_new = deepcopy(det) ma_new = det_new.multiaccum ma_new.read_mode = patt ma_new.ngroup = int((det.multiaccum.ngroup - ma_new.nd1 + ma_new.nd2) / (ma_new.nf + ma_new.nd2)) ng_all = ng_all_list[i] thr_e = det_new.pixel_noise(ng=ng_all, rn=read_noise, idark=idark, ideal_Poisson=ideal_Poisson, p_excess=[0,0]) tvals = (ng_all - 1) * det_new.time_group var_ex_obs = (en_dn_list[i] * gain * tvals)**2 - (thr_e * tvals)**2 nf = ma_new.nf var_ex_fit = var_ex_model(ng_all, nf, params) diff_all.append(var_ex_obs - var_ex_fit) return np.concatenate(diff_all)
####################################### # IPC and PPC Deconvolution #######################################
[docs]def deconv_single_image(im, kfft): """Image deconvolution for a kernel""" # bias the image to avoid negative pixel values in image min_im = np.min(im) im = im - min_im # FFT of input image imfft = np.fft.fft2(im) im_final = np.fft.fftshift(np.fft.ifft2(imfft/kfft).real, axes=(-2,-1)) im_final += min_im return im_final
[docs]def ipc_deconvolve(imarr, kernel, kfft=None, **kwargs): """Simple IPC image deconvolution Given an image (or image cube), apply an IPC deconvolution kernel to obtain the intrinsic flux distribution. Should also work for PPC kernels. This simply calculates the FFT of the image(s) and kernel, divides them, then applies an iFFT to determine the deconvolved image. If performing PPC deconvolution, make sure to perform channel-by-channel with the kernel in the appropriate scan direction. IPC is usually symmetric, so this restriction may not apply. See `ppc_deconvolve` function. Calls `ppc_deconvolve` for asymmetric (left-right) IPC kernels. Parameters ========== im : ndarray Image or array of images. kernel : ndarry Deconvolution kernel. Ignored if `kfft` is specified. kfft : Complex ndarray Option to directy supply the kernel's FFT rather than calculating it within the function. The supplied ndarray should have shape (ny,nx) equal to the input `im`. Useful if calling ``ipc_deconvolve`` multiple times. symmetric : bool Is the input IPC kernel symmetric? Keyword Args ============ in_place : bool Perform calculate in place (overwrites original image). nchans : int Number of amplifier channels. same_scan_direction : bool Are all the output channels read in the same direction? By default fast-scan readout direction is ``[-->,<--,-->,<--]`` If ``same_scan_direction``, then all ``-->`` reverse_scan_direction : bool If ``reverse_scan_direction``, then ``[<--,-->,<--,-->]`` or all ``<--`` """ # Image cube shape sh = imarr.shape ndim = len(sh) if ndim==2: ny, nx = sh nz = 1 imarr = imarr.reshape([nz,ny,nx]) else: nz, ny, nx = sh # FFT of kernel if kfft is None: ipc_big = pad_or_cut_to_size(kernel, (ny,nx)) kfft = np.fft.fft2(ipc_big) im_final = np.zeros_like(imarr) for i in trange(nz, leave=False, desc='Frames'): im_final[i] = deconv_single_image(imarr[i], kfft) return im_final.reshape(sh)
[docs]def ppc_deconvolve(im, kernel, kfft=None, nchans=4, in_place=False, same_scan_direction=False, reverse_scan_direction=False, **kwargs): """PPC image deconvolution Given an image (or image cube), apply PPC deconvolution kernel to obtain the intrinsic flux distribution. This performs channel-by-channel deconvolution, taking into account the specific readout directly. This function can also be used for asymmetric IPC kernels. Parameters ========== im : ndarray Image or array of images. Assumes detector coordinates where (0,0) is in bottom left. kernel : ndarry Deconvolution kernel. Ignored if `kfft` is specified. kfft : Complex ndarray Option to directy supply the kernel's FFT rather than calculating it within the function. The supplied ndarray should have shape (ny,nx) equal to the input `im`. Useful if calling ``ppc_deconvolve`` multiple times. in_place : bool Perform calculate in place (overwrites original image). nchans : int Number of amplifier channels. same_scan_direction : bool Are all the output channels read in the same direction? By default fast-scan readout direction is ``[-->,<--,-->,<--]`` If ``same_scan_direction``, then all ``-->`` reverse_scan_direction : bool If ``reverse_scan_direction``, then ``[<--,-->,<--,-->]`` or all ``<--`` """ # Need copy, otherwise will overwrite input data if not in_place: im = im.copy() # Image cube shape sh = im.shape ndim = len(sh) if ndim==2: ny, nx = sh nz = 1 else: nz, ny, nx = sh chsize = int(nx / nchans) im = im.reshape([nz,ny,nchans,-1]) # FFT of kernel if kfft is None: k_big = pad_or_cut_to_size(kernel, (ny,chsize)) kfft = np.fft.fft2(k_big) # Channel-by-channel deconvolution for ch in trange(nchans, leave=False, desc='PPC Amps'): sub = im[:,:,ch,:] if same_scan_direction: flip = True if reverse_scan_direction else False elif np.mod(ch,2)==0: flip = True if reverse_scan_direction else False else: flip = False if reverse_scan_direction else True if flip: # Orient to left->right readout direction sub = sub[:,:,::-1] # Call IPC function sub = ipc_deconvolve(sub, kernel, kfft=kfft) if flip: # Orient back sub = sub[:,:,::-1] im[:,:,ch,:] = sub im = im.reshape(sh) return im
[docs]def get_ipc_kernel(imdark, tint=None, boxsize=5, nchans=4, bg_remove=True, hotcut=[5000,50000], calc_ppc=False, same_scan_direction=False, reverse_scan_direction=False, ref_info=[4,4,4,4], suppress_error_msg=False): """ Derive IPC/PPC Convolution Kernels Find the IPC and PPC kernels used to convolve detector pixel data. Finds all hot pixels within hotcut parameters and measures the average relative flux within adjacent pixels. Parameters ========== imdark : ndarray Image to search for hot pixels in units of DN or DN/sec. If in terms of DN/sec, make sure to set `tint` to convert to raw DN. Keyword Parameters ================== tint : float or None Integration time to convert dark current rate into raw pixel values (DN). If None, then input image is assumed to be in units of DN. boxsize : int Size of the box. Should be odd. If even, will increment by 1. nchans : int Number of amplifier channels; necessary for PPC measurements. bg_remove : bool Remove the average dark current values for each hot pixel cut-out. Only works if boxsize>3. hotcut : array-like Min and max values of hot pixels (above bg and bias) to consider. calc_ppc : bool Calculate and return post-pixel coupling? same_scan_direction : bool Are all the output channels read in the same direction? By default fast-scan readout direction is ``[-->,<--,-->,<--]`` If ``same_scan_direction``, then all ``-->`` reverse_scan_direction : bool If ``reverse_scan_direction``, then ``[<--,-->,<--,-->]`` or all ``<--`` """ ny, nx = imdark.shape chsize = int(nx / nchans) imtemp = imdark.copy() if tint is None else imdark * tint boxhalf = int(boxsize/2) boxsize = int(2*boxhalf + 1) distmin = np.ceil(np.sqrt(2.0) * boxhalf) pixmask = ((imtemp>hotcut[0]) & (imtemp<hotcut[1])) # Get rid of pixels around border lower, upper, left, right = ref_info pixmask[0:lower+boxhalf, :] = False pixmask[-upper-boxhalf:, :] = False pixmask[:, 0:left+boxhalf] = False pixmask[:, -right-boxhalf:] = False # Ignore borders between amplifiers for ch in range(1, nchans): x1 = ch*chsize - boxhalf x2 = x1 + 2*boxhalf pixmask[:, x1:x2] = False indy, indx = np.where(pixmask) nhot = len(indy) if nhot < 2: if not suppress_error_msg: _log.warn("No hot pixels found!") return None # Only want isolated pixels # Get distances for every pixel # If too close, then set equal to 0 for i in range(nhot): d = np.sqrt((indx-indx[i])**2 + (indy-indy[i])**2) ind_close = np.where((d>0) & (d<distmin))[0] if len(ind_close)>0: pixmask[indy[i], indx[i]] = 0 indy, indx = np.where(pixmask) nhot = len(indy) if nhot < 2: if not suppress_error_msg: _log.warn("No hot pixels found!") return None else: _log.info(f'Number of hot pixels: {nhot}') # Stack all hot pixels in a cube hot_all = [] for iy, ix in zip(indy, indx): x1, y1 = np.array([ix,iy]) - boxhalf x2, y2 = np.array([x1,y1]) + boxsize sub = imtemp[y1:y2, x1:x2] # Flip channels along x-axis for PPC if calc_ppc: # Check if an even or odd channel (index 0) for ch in np.arange(0,nchans,2): even = True if (ix > ch*chsize) and (ix < (ch+1)*chsize-1) else False if same_scan_direction: flip = True if reverse_scan_direction else False elif even: flip = True if reverse_scan_direction else False else: flip = False if reverse_scan_direction else True if flip: sub = sub[:,::-1] hot_all.append(sub) hot_all = np.array(hot_all) # Remove average dark current values if boxsize>3 and bg_remove==True: for im in hot_all: im -= np.median([im[0,:], im[:,0], im[-1,:], im[:,-1]]) # Normalize by sum in 3x3 region norm_all = hot_all.copy() for im in norm_all: im /= im[boxhalf-1:boxhalf+2, boxhalf-1:boxhalf+2].sum() # Take average of normalized stack ipc_im_avg = np.median(norm_all, axis=0) # ipc_im_sig = robust.medabsdev(norm_all, axis=0) corner_val = (ipc_im_avg[boxhalf-1,boxhalf-1] + ipc_im_avg[boxhalf+1,boxhalf+1] + ipc_im_avg[boxhalf+1,boxhalf-1] + ipc_im_avg[boxhalf-1,boxhalf+1]) / 4 if corner_val<0: corner_val = 0 # Determine post-pixel coupling value? if calc_ppc: ipc_val = (ipc_im_avg[boxhalf-1,boxhalf] + \ ipc_im_avg[boxhalf,boxhalf-1] + \ ipc_im_avg[boxhalf+1,boxhalf]) / 3 if ipc_val<0: ipc_val = 0 ppc_val = ipc_im_avg[boxhalf,boxhalf+1] - ipc_val if ppc_val<0: ppc_val = 0 k_ipc = np.array([[corner_val, ipc_val, corner_val], [ipc_val, 1-4*ipc_val, ipc_val], [corner_val, ipc_val, corner_val]]) k_ppc = np.zeros([3,3]) k_ppc[1,1] = 1 - ppc_val k_ppc[1,2] = ppc_val return (k_ipc / k_ipc.sum(), k_ppc / k_ppc.sum()) # Just determine IPC else: ipc_val = (ipc_im_avg[boxhalf-1,boxhalf] + ipc_im_avg[boxhalf,boxhalf-1] + ipc_im_avg[boxhalf,boxhalf+1] + ipc_im_avg[boxhalf+1,boxhalf]) / 4 if ipc_val<0: ipc_val = 0 kernel = np.array([[corner_val, ipc_val, corner_val], [ipc_val, 1-4*ipc_val, ipc_val], [corner_val, ipc_val, corner_val]]) return kernel / kernel.sum()
[docs]def plot_kernel(kern, ax=None, return_figax=False): """ Plot image of IPC or PPC kernel Parameters ---------- kern : ndarray Kernel image (3x3 or 5x5, etc) to plot. ax : axes Axes to plot kernel on. If None, will create new figure and axes subplot. return_figax : bool Return the (figure, axes) for user manipulations? """ if ax is None: fig, ax = plt.subplots(1,1, figsize=(5,5)) else: fig = None # Convert to log scale for better contrast between pixels kern = kern.copy() kern[kern==0] = 1e-7 ny, nx = kern.shape extent = np.array([-nx/2,nx/2,-ny/2,ny/2]) ax.imshow(np.log(kern), extent=extent, vmax=np.log(1), vmin=np.log(1e-5)) # Add text to each pixel position for i in range(ny): ii = i + int(-ny/2) for j in range(nx): jj = j + int(-nx/2) if (ii==0) and (jj==0): # Different text format at center position ax.text(jj,ii, '{:.2f}%'.format(kern[i,j]*100), color='black', fontsize=16, horizontalalignment='center', verticalalignment='center') else: ax.text(jj,ii, '{:.3f}%'.format(kern[i,j]*100), color='white', fontsize=16, horizontalalignment='center', verticalalignment='center') ax.tick_params(axis='both', color='white', which='both') for k in ax.spines.keys(): ax.spines[k].set_color('white') if fig is not None: ax.set_title('IPC Kernel', fontsize=16) fig.tight_layout() if return_figax: return fig, ax
####################################### # Power spectrum information #######################################
[docs]def pow_spec_ramp(data, nchan, nroh=0, nfoh=0, nframes=1, expand_npix=False, same_scan_direction=False, reverse_scan_direction=False, mn_func=np.mean, return_freq=False, dt=1, **kwargs): """ Get power spectrum within frames of input ramp Takes an input cube, splits it into output channels, and finds the power spectrum of each frame. Then, calculate the average power spectrum for each channel. Use `nroh` and `nfoh` to expand the frame size to encapsulate the row and frame overheads not included in the science data. These just zero-pad the array. Parameters ========== data : ndarray Input Image cube. nchan : int Number of amplifier channels. nroh : int Number of pixel overheads per row. nfoh : int Number of row overheads per frame. nframes : int Number of frames to use to calculate an power spectrum. Normally we just use 1 frame time expand_npix : bool Should we zero-pad the array to a power of two factor for incresed speed? same_scan_direction : bool Are all the output channels read in the same direction? By default fast-scan readout direction is ``[-->,<--,-->,<--]`` If ``same_scan_direction``, then all ``-->`` reverse_scan_direction : bool If ``reverse_scan_direction``, then ``[<--,-->,<--,-->]`` or all ``<--`` """ nz, ny, nx = data.shape chsize = int(nx / nchan) # Channel size and ny plus pixel and row overheads ch_poh = chsize + nroh ny_poh = ny + nfoh ps_data = [] # Hold channel data for ch in range(nchan): # Array of pixel values if (nroh>0) or (nfoh>0): sig = np.zeros([nz,ny_poh,ch_poh]) sig[:,0:ny,0:chsize] += data[:,:,ch*chsize:(ch+1)*chsize] else: sig = data[:,:,ch*chsize:(ch+1)*chsize] # Flip x-axis for odd channels if same_scan_direction: flip = True if reverse_scan_direction else False elif np.mod(ch,2)==0: flip = True if reverse_scan_direction else False else: flip = False if reverse_scan_direction else True sig = sig[:,:,::-1] if flip else sig if nframes==1: sig = sig.reshape([sig.shape[0],-1]) npix = sig.shape[1] # Pad nsteps to a power of 2, which can be faster npix2 = int(2**np.ceil(np.log2(npix))) if expand_npix else npix # Power spectrum of each frame ps = np.abs(np.fft.rfft(sig, n=npix2))**2 / npix2 else: sh = sig.shape npix = nframes * sh[-2] * sh[-1] # Pad nsteps to a power of 2, which can be faster npix2 = int(2**np.ceil(np.log2(npix))) if expand_npix else npix # Power spectrum for each set of frames niter = nz - nframes + 1 ps = [] for i in range(niter): sig2 = sig[i:i+nframes].ravel() # Power spectrum ps.append(np.abs(np.fft.rfft(sig2, n=npix2))**2 / npix2) ps = np.array(ps) # Average of all power spectra ps_data.append(mn_func(ps, axis=0)) # Power spectrum of each output channel ps_data = np.array(ps_data) if return_freq: freq = get_freq_array(ps_data, dt=dt) return ps_data, freq else: return ps_data
[docs]def pow_spec_ramp_pix(data, nchan, expand_nstep=False, mn_func=np.mean, return_freq=False, dt=1, **kwargs): """ Get power spectrum of pixels within ramp Takes an input cube, splits it into output channels, and finds the power spectrum of each pixel. Return the average power spectrum for each channel. Parameters ========== data : ndarray Input Image cube. nchan : int Number of amplifier channels. expand_nstep : bool Should we zero-pad the array to a power of two factor for incresed speed? """ nz, ny, nx = data.shape chsize = int(nx / nchan) ps_data = [] # Hold channel data for ch in range(nchan): # Array of pixel values sig = data[:,:,ch*chsize:(ch+1)*chsize] sig = sig.reshape([sig.shape[0],-1]) nstep = sig.shape[0] # Pad nsteps to a power of 2, which can be faster nstep2 = int(2**np.ceil(np.log2(nstep))) if expand_nstep else nstep # Power spectrum of each pixel ps = np.abs(np.fft.rfft(sig, n=nstep2, axis=0))**2 / nstep2 # Average of all power spectra ps_data.append(mn_func(ps, axis=1)) # Power spectrum of each output channel ps_data = np.array(ps_data) if return_freq: freq = get_freq_array(ps_data, dt=dt) return ps_data, freq else: return ps_data
[docs]def fit_corr_powspec(freq, ps, flim1=[0,1], flim2=[10,100], alpha=-1, **kwargs): """ Fit Correlated Noise Power Spectrum Fit the scaling factors of the 1/f power law components observed in the correlated noise power spectra. This function separately calculates the high-freq and low- freq scale factor components defined by the fcut params. The mid-frequency ranges are interpolated in log space. Parameters ========== freq : ndarray Input frequencies corresponding to power spectrum. ps : ndarray Input power spectrum to fit. flim1 : float Fit frequencies within this range to get scaling for low frequency 1/f noise. flim2 : float Fit frequencies within this range to get scaling for high frequency 1/f noise. alpha : float Noise power spectrum scaling """ yf = freq**alpha yf[0] = 0 # Low frequency fit ind = (freq >= flim1[0]) & (freq <= flim1[1]) & (yf > 0) scl1 = np.median(ps[ind] / yf[ind]) # High frequency fit ind = (freq >= flim2[0]) & (freq <= flim2[1]) & (yf > 0) scl2 = np.median(ps[ind] / yf[ind]) return np.array([scl1, scl2])
[docs]def broken_pink_powspec(freq, scales, fcut1=1, fcut2=10, alpha=-1, **kwargs): scl1, scl2 = scales yf = freq**alpha yf[0] = 0 # Output array res = np.zeros(len(yf)) # Low frequency component ind = (freq <= fcut1) res[ind] = scl1*yf[ind] # High frequency componet ind = (freq >= fcut2) res[ind] = scl2*yf[ind] # Mid frequency interpolation, log space ind = (freq > fcut1) & (freq < fcut2) xlog = np.log10(freq) ylog = np.log10(res) ylog[ind] = np.interp(xlog[ind], xlog[~ind], ylog[~ind]) res[ind] = 10**ylog[ind] return res
[docs]def get_power_spec(data, nchan=4, calc_cds=True, kw_powspec=None, per_pixel=False, return_corr=False, return_ucorr=False, mn_func=np.mean): """ Calculate the power spectrum of an input data ramp in a variety of ways. If return_corr and return_ucorr are both False, then will return (ps_all, None, None). Parameters ========== calc_cds : bool Power spectrum of CDS pairs or individual frames? per_pixel : bool Calculate average power spectrum of each pixel along ramp (frame timescales)? If False, samples pixels within a frame (pixel read timescales) return_corr : bool Return power spectrum of channel correlated 1/f noise? return_ucorr : bool Return power spectra of channel-dependent (uncorrelated) 1/f noise? kw_powspec : dict Keyword arguments to pass to `pow_spec_ramp` function. mn_func : func Function to use to perform averaging of individual power spectra. """ nz, ny, nx = data.shape chsize = int(nx/nchan) # CDS or just subtract first frame if calc_cds: cds = data[1::2] - data[0::2] else: cds = data[1:] - data[0] # Remove averages from each frame cds_mn = np.median(cds.reshape([cds.shape[0], -1]), axis=1) cds -= cds_mn.reshape([-1,1,1]) # Remove averages from each pixel cds_mn = np.median(cds, axis=0) cds -= cds_mn # Keywords for power spectrum # Only used for pow_spec_ramp, not pow_spec_ramp_pix if kw_powspec is None: kw_powspec = { 'nroh': 0, 'nfoh': 0, 'nframes': 1, 'same_scan_direction': False, 'reverse_scan_direction': False } same_scan_direction = kw_powspec['same_scan_direction'] # Power spectrum of all frames data if per_pixel: ps_full = pow_spec_ramp_pix(cds, nchan, mn_func=mn_func) else: ps_full = pow_spec_ramp(cds, nchan, mn_func=mn_func, **kw_powspec) # Extract 1/f noise from data ps_corr, ps_ucorr = (None, None) if return_ucorr or return_corr: fn_data = [] for im in cds: ch_arr = im.reshape([ny,-1,chsize]).transpose([1,0,2]) mask = np.abs(im - np.median(im)) > 10*robust.medabsdev(im) mask = mask.reshape([ny,-1,chsize]).transpose([1,0,2]) fnoise = channel_smooth_savgol(ch_arr, mask=mask) fnoise = fnoise.transpose([1,0,2]).reshape([ny,nx]) fn_data.append(fnoise) fn_data = np.array(fn_data) # Delete data and cds arrays to free up memory del cds # Split into correlated and uncorrelated components fn_data_corr = [] for j, im in enumerate(fn_data): # Extract correlated 1/f noise data fn_corr = channel_averaging(im, nchans=nchan, off_chans=False, same_scan_direction=same_scan_direction, mn_func=np.mean) # Subtract correlated noise from fn_data if return_ucorr: fn_data[j] -= fn_corr # Only append first channel since the rest are the same data fn_data_corr.append(fn_corr[:,0:chsize]) fn_data_corr = np.array(fn_data_corr) # Power spectrum of uncorrelated 1/f noise if return_ucorr: if per_pixel: ps_ucorr = pow_spec_ramp_pix(fn_data, nchan, mn_func=mn_func) else: ps_ucorr = pow_spec_ramp(fn_data, nchan, mn_func=mn_func, **kw_powspec) del fn_data # Power spectrum of correlated 1/f noise if return_corr: if per_pixel: ps_corr = pow_spec_ramp_pix(fn_data_corr, 1, mn_func=mn_func) else: ps_corr = pow_spec_ramp(fn_data_corr, 1, mn_func=mn_func, **kw_powspec) del fn_data_corr return ps_full, ps_ucorr, ps_corr
[docs]def get_power_spec_all(allfiles, super_bias=None, det=None, DMS=False, include_oh=False, same_scan_direction=False, reverse_scan_direction=False, calc_cds=True, return_corr=False, return_ucorr=False, per_pixel=False, mn_func=np.mean, kw_reffix=None): """ Return the average power spectra (white, 1/f noise correlated and uncorrelated) of all FITS files. Parameters ========== allfiles : array-like List of FITS files to operate on. super_bias : ndarray Option to subtract a super bias image from all frames in a ramp. Provides slightly better statistical averaging for reference pixel correction routines. det : Detector class Option to pass known NIRCam detector class. This will get generated from a FITS header if not specified. DMS : bool Are the files DMS formatted or FITSWriter? include_oh : bool Zero-pad the data to insert line and frame overhead pixels? same_scan_direction : bool Are all the output channels read in the same direction? By default fast-scan readout direction is ``[-->,<--,-->,<--]`` If ``same_scan_direction``, then all ``-->`` reverse_scan_direction : bool If ``reverse_scan_direction``, then ``[<--,-->,<--,-->]`` or all ``<--`` calc_cds : bool Power spectrum of CDS pairs or individual frames? per_pixel : bool Calculate average power spectrum of each pixel along ramp (frame timescales)? If False, samples pixels within a frame (pixel read timescales). return_corr : bool Return power spectrum of channel correlated 1/f noise? return_ucorr : bool Return power spectra of channel-dependent (uncorrelated) 1/f noise? kw_powspec : dict Keyword arguments to pass to `pow_spec_ramp` function. mn_func : func Function to use to perform averaging of individual power spectra. """ # Set logging to WARNING to suppress messages log_prev = conf.logging_level setup_logging('WARN', verbose=False) if super_bias is None: super_bias = 0 # Header info from first file if det is None: hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=DMS) # Overhead information nchan = det.nout # Row and frame overheads if include_oh: nroh = det._line_overhead nfoh = det._extra_lines else: nroh = nfoh = 0 # Keywords for reffix if kw_reffix is None: kw_reffix = { 'nchans': nchan, 'altcol': True, 'in_place': True, 'fixcol': False, 'avg_type': 'pixel', 'savgol': True, 'perint': False } # Keywords for power spectrum kw_powspec = { 'nroh': nroh, 'nfoh': nfoh, 'nframes': 1, 'same_scan_direction': same_scan_direction, 'reverse_scan_direction': reverse_scan_direction } pow_spec_all = [] if return_corr: pow_spec_corr = [] if return_ucorr: pow_spec_ucorr = [] for fname in tqdm(allfiles, desc='Files'): # If DMS, then might be multiple integrations per FITS file nint = fits.getheader(fname)['NINTS'] if DMS else 1 iter_range = trange(nint, desc='Ramps', leave=False) if nint>1 else range(nint) for i in iter_range: data = get_fits_data(fname, bias=super_bias, reffix=True, DMS=DMS, int_ind=i, **kw_reffix) ps_full, ps_ucorr, ps_corr = get_power_spec(data, nchan=nchan, calc_cds=calc_cds, return_corr=return_corr, return_ucorr=return_ucorr, per_pixel=per_pixel, mn_func=mn_func, kw_powspec=kw_powspec) pow_spec_all.append(ps_full) if return_corr: pow_spec_corr.append(ps_corr) if return_ucorr: pow_spec_ucorr.append(ps_ucorr) del data # Full spectra pow_spec_all = np.array(pow_spec_all) ps_all = np.mean(pow_spec_all, axis=0) # Correlated Noise if return_corr: pow_spec_corr = np.array(pow_spec_corr) ps_corr = np.mean(pow_spec_corr, axis=0).squeeze() else: ps_corr = None # Uncorrelated Noise per amplifier channel if return_ucorr: pow_spec_ucorr = np.array(pow_spec_ucorr) ps_ucorr = np.mean(pow_spec_all, axis=0) else: ps_ucorr = None # Set back to previous logging level setup_logging(log_prev, verbose=False) return ps_all, ps_corr, ps_ucorr
[docs]def get_freq_array(pow_spec, dt=1, nozero=False, npix_odd=False): """ Return frequencies associated with power spectrum Parameters ========== pow_spec : ndarray Power spectrum to obtain associated frequency array. dt : float Delta time between corresponding elements in time domain. nozero : bool Set freq[0] = freq[1] to remove zeros? This is mainly so we don't obtain NaN's later when calculating 1/f noise. npix_odd : bool We normally assume that the original time-domain data was comprised of an even number of pixels. However, if it were actually odd, the frequency array will be slightly shifted. Set this to True if the intrinsic data that was used to generate the pow_spec had an odd number of elements. """ # This assumes an even input array npix = 2 * (pow_spec.shape[-1] - 1) # Off by 1 if initial npix was odd if npix_odd: npix += 1 freq = np.fft.rfftfreq(npix, d=dt) if nozero: # First element should not be 0 freq[0] = freq[1] return freq
####################################### # Linearity and Gain ####################################### # Determine saturation level in ADU (relative to bias)
[docs]def find_sat(data, bias=None, ref_info=[4,4,4,4], bit_depth=16): """ Given a data cube, find the values in ADU in which data reaches hard saturation. """ # Maximum possible value corresponds to bit depth sat_max = 2**bit_depth-1 sat_min = 0 # Subtract bias? nz, ny, nx = data.shape imarr = data if bias is None else data - bias # Data can be characterized as large differences at start, # followed by decline and then difference of 0 at hard saturation # Determine difference between samples diff_arr = imarr[1:] - imarr[0:-1] # Select pixels to determine individual saturation values diff_max = np.median(diff_arr[0]) / 10 diff_min = 100 # Ensure a high rate at the beginning and a flat rate at the end sat_mask = (diff_arr[0]>diff_max) & (np.abs(diff_arr[-1]) < diff_min) # Median value to use for pixels that didn't reach saturation # sat_med = np.median(imarr[-1, sat_mask]) # Initialize saturation array with median # sat_arr = np.ones([ny,nx]) * sat_med # Initialize saturation as max-min sat_arr = imarr[-1] - imarr[0] sat_arr[sat_mask] = imarr[-1, sat_mask] # Bound between 0 and bit depth sat_arr[sat_arr<sat_min] = sat_min sat_arr[sat_arr>sat_max] = sat_max # Reference pixels don't saturate # [bottom, upper, left, right] br, ur, lr, rr = ref_info ref_mask = np.zeros([ny,nx], dtype=bool) if br>0: ref_mask[0:br,:] = True if ur>0: ref_mask[-ur:,:] = True if lr>0: ref_mask[:,0:lr] = True if rr>0: ref_mask[:,-rr:] = True sat_arr[ref_mask] = sat_max return sat_arr
# Fit unsaturated data and return coefficients
[docs]def cube_fit(tarr, data, bias=None, sat_vals=None, sat_frac=0.95, deg=1, fit_zero=False, verbose=False, ref_info=[4,4,4,4], use_legendre=False, lxmap=None, return_lxmap=False, return_chired=False): nz, ny, nx = data.shape # Subtract bias? imarr = data if bias is None else data - bias # Get saturation levels if sat_vals is None: sat_vals = find_sat(imarr, ref_info=ref_info) # Array of masked pixels (saturated) mask_good = imarr < sat_frac*sat_vals # Reshape for all pixels in single dimension imarr = imarr.reshape([nz, -1]) mask_good = mask_good.reshape([nz, -1]) # Initial cf = np.zeros([deg+1, nx*ny]) if return_lxmap: lx_min = np.zeros([nx*ny]) lx_max = np.zeros([nx*ny]) if return_chired: chired = np.zeros([nx*ny]) # For each npix_sum = 0 i0 = 0 if fit_zero else 1 for i in np.arange(i0,nz)[::-1]: ind = (cf[1] == 0) & (mask_good[i]) npix = np.sum(ind) npix_sum += npix if verbose: print(i+1,npix,npix_sum, 'Remaining: {}'.format(nx*ny-npix_sum)) if npix>0: if fit_zero: x = np.concatenate(([0], tarr[0:i+1])) y = np.concatenate((np.zeros([1, np.sum(ind)]), imarr[0:i+1,ind]), axis=0) else: x, y = (tarr[0:i+1], imarr[0:i+1,ind]) if return_lxmap: lx_min[ind] = np.min(x) if lxmap is None else lxmap[0] lx_max[ind] = np.max(x) if lxmap is None else lxmap[1] # Fit line if too few points relative to polynomial degree if len(x) <= deg+1: cf[0:2,ind] = jl_poly_fit(x,y, deg=1, use_legendre=use_legendre, lxmap=lxmap) else: cf[:,ind] = jl_poly_fit(x,y, deg=deg, use_legendre=use_legendre, lxmap=lxmap) # Get reduced chi-sqr metric for poorly fit data if return_chired: yfit = jl_poly(x, cf[:,ind]) deg_chi = 1 if len(x)<=deg+1 else deg dof = y.shape[0] - deg_chi chired[ind] = chisqr_red(y, yfit=yfit, dof=dof) imarr = imarr.reshape([nz,ny,nx]) mask_good = mask_good.reshape([nz,ny,nx]) cf = cf.reshape([deg+1,ny,nx]) if return_lxmap: lxmap_arr = np.array([lx_min, lx_max]).reshape([2,ny,nx]) if return_chired: chired = chired.reshape([ny,nx]) return cf, lxmap_arr, chired else: return cf, lxmap_arr else: if return_chired: chired = chired.reshape([ny,nx]) return cf, chired else: return cf
[docs]def time_to_sat(data, sat_vals, dt=1, sat_calc=0.998, ref_info=[4,4,4,4]): """ Determine time of saturation""" nz, ny, nx = data.shape # Active and reference pixel masks lower, upper, left, right = ref_info mask_ref = np.zeros([ny,nx], dtype='bool') if lower>0: mask_ref[0:lower,:] = True if upper>0: mask_ref[-upper:,:] = True if left>0: mask_ref[:,0:left] = True if right>0: mask_ref[:,-right:] = True # Time array tarr = np.arange(1,nz+1) * dt pvals = data svals = sat_vals # Find time where data reaches 99% of saturation mask99 = pvals < sat_calc*sat_vals # Linear interpolate to find time we reach full well pvals1 = np.max(pvals * mask99, axis=0) pvals2 = np.max(pvals * np.roll(mask99,1,axis=0), axis=0) tvals1 = np.max(tarr.reshape([-1,1,1]) * mask99, axis=0) tvals2 = np.max(tarr.reshape([-1,1,1]) * np.roll(mask99,1,axis=0), axis=0) # Time at which we reach 100% saturation tfin = tvals1 + (tvals2 - tvals1) * (sat_calc - pvals1 / svals) / ((pvals2 - pvals1) / svals) del pvals1, pvals2, tvals1, tvals2 tfin[mask_ref] = 0 tfin[~np.isfinite(tfin)] = 0 return tfin
[docs]def find_group_sat(file, DMS=False, bias=None, sat_vals=None, sat_calc=0.998): """Group at which 98% of pixels are saturated""" # Set logging to WARNING to suppress messages log_prev = conf.logging_level setup_logging('WARN', verbose=False) hdr = fits.getheader(file) det = create_detops(hdr, DMS=DMS) setup_logging(log_prev, verbose=False) nz, ny, nx = (det.multiaccum.ngroup, det.ypix, det.xpix) nchan = det.nout # Active and reference pixel masks # Masks for active, reference, and amplifiers mask_ref = det.mask_ref mask_act = ~mask_ref # Read in data kwargs_ref = {'nchans': nchan, 'in_place': True, 'altcol': True, 'fixcol': False} data = get_fits_data(file, DMS=DMS, bias=bias, reffix=True, **kwargs_ref) if sat_vals is None: sat_vals = find_sat(data, ref_info=det.ref_info) # Get saturation times for each tsat = time_to_sat(data, sat_vals, dt=1, sat_calc=sat_calc, ref_info=det.ref_info) vals = tsat[mask_act] bins = np.arange(vals.min(), vals.max()+1, 1) ig, vg, cv = hist_indices(vals, bins=bins, return_more=True) nvals = np.array([len(i) for i in ig]) nsum = np.cumsum(nvals) / nvals.sum() # Index containing 98% of pixels imax = np.min(np.where(nsum>0.98)[0]) return imax
[docs]def calc_nonlin_coeff(data, sat_vals, well_depth, sat_calc=0.98, ref_info=[4,4,4,4], counts_cut=None, deg=8, use_legendre=True, lxmap=[0,1e5], **kwargs): """ counts_cut : None or float Option to fit two sets of polynomial coefficients to lower and uppper values. 'counts_cut' specifies the division in values of electrons. Useful for pixels with different non-linear behavior at low flux levels. Recommended values of 15000 e-. """ nz, ny, nx = data.shape # Time array tarr = np.arange(1,nz+1) # Active and reference pixel masks lower, upper, left, right = ref_info mask_ref = np.zeros([ny,nx], dtype='bool') if lower>0: mask_ref[0:lower,:] = True if upper>0: mask_ref[-upper:,:] = True if left>0: mask_ref[:,0:left] = True if right>0: mask_ref[:,-right:] = True # Find time where data reaches 99% of saturation mask99 = data < sat_calc*sat_vals tfin = time_to_sat(data, sat_vals, sat_calc=0.998, ref_info=ref_info) # Get rid of 0s and NaN's ind_bad = (np.isnan(tfin)) | (tfin==0) tfin[ind_bad] = np.median(tfin[~ind_bad]) # Create ideal pixel ramps in e- ramp = well_depth * tarr.reshape([-1,1,1]) / tfin.reshape([1,ny,nx]) ramp[ramp>well_depth] = well_depth # Simultaneously fit pixels that have the same ideal ramps bsize = 0.05 bins = np.arange(tfin.min(), tfin.max()+bsize, bsize) ig, vg, cv = hist_indices(tfin, bins=bins, return_more=True) # Select only indices with len>0 nvals = np.array([len(i) for i in ig]) ig_nozero = np.array(ig)[nvals>0] # Reshape to put all pixels in single dimension data_flat = data.reshape([data.shape[0], -1]) ramp_flat = ramp.reshape([ramp.shape[0], -1]) mask100 = mask99 #data < sat_calc mask100_flat = mask100.reshape([mask100.shape[0], -1]) if counts_cut is None: cf_arr = np.zeros([deg+1,nx*ny]) else: cf_arr1 = np.zeros([deg+1,nx*ny]) cf_arr2 = np.zeros([deg+1,nx*ny]) for ii in trange(len(ig_nozero), leave=False, desc='Linearity Fitting'): ig_sub = ig_nozero[ii] # Grab values less than well depth ind = mask100_flat[:,ig_sub[0]] indz = np.where(ind==False)[0] if len(indz)>0: ind[indz[0]] = True # Set next element true pix_dn = data_flat[:,ig_sub][ind] # DN Values pix_el = ramp_flat[:,ig_sub][ind] # electron values pix_el_mn = np.mean(pix_el, axis=1) # Gain function gain = pix_el_mn.reshape([-1,1]) / pix_dn if counts_cut is None: cf_arr[:,ig_sub] = jl_poly_fit(pix_el_mn, gain, deg=deg, use_legendre=use_legendre, lxmap=lxmap) else: # Fit high pixel values ifit1 = (pix_el_mn >= counts_cut) if ifit1.sum() > 0: cf_arr1[:,ig_sub] = jl_poly_fit(pix_el_mn[ifit1], gain[ifit1], deg=deg, use_legendre=use_legendre, lxmap=lxmap) # Fit low pixel values ifit2 = ~ifit1 if ifit2.sum() > 0: cf_arr2[:,ig_sub] = jl_poly_fit(pix_el_mn[ifit2], gain[ifit2], deg=deg, use_legendre=use_legendre, lxmap=lxmap) # Reshape and set reference masks to 0 if counts_cut is None: cf_arr = cf_arr.reshape([deg+1,ny,nx]) cf_arr[:,mask_ref] = 0 return cf_arr else: cf_arr1 = cf_arr1.reshape([deg+1,ny,nx]) cf_arr1[:,mask_ref] = 0 cf_arr2 = cf_arr2.reshape([deg+1,ny,nx]) cf_arr2[:,mask_ref] = 0 return cf_arr1, cf_arr2
[docs]def calc_linearity_coeff(data, sat_vals, well_depth, sat_calc=0.98, ref_info=[4,4,4,4], counts_cut=None, deg=8, use_legendre=True, lxmap=[0,1e5], nonlin=False, **kwargs): """ counts_cut : None or float Option to fit two sets of polynomial coefficients to lower and uppper values. 'counts_cut' specifies the division in values of electrons. Useful for pixels with different non-linear behavior at low flux levels. Recommended values of 15000 e-. """ if nonlin: return calc_nonlin_coeff(data, sat_vals, well_depth, sat_calc=sat_calc, ref_info=ref_info, counts_cut=counts_cut, deg=deg, use_legendre=use_legendre, lxmap=lxmap) nz, ny, nx = data.shape # Time array tarr = np.arange(1,nz+1) # Active and reference pixel masks lower, upper, left, right = ref_info mask_ref = np.zeros([ny,nx], dtype='bool') if lower>0: mask_ref[0:lower,:] = True if upper>0: mask_ref[-upper:,:] = True if left>0: mask_ref[:,0:left] = True if right>0: mask_ref[:,-right:] = True # Find time where data reaches 99% of saturation mask99 = data < sat_calc*sat_vals tfin = time_to_sat(data, sat_vals, sat_calc=0.998, ref_info=ref_info) # Get rid of 0s and NaN's ind_bad = (np.isnan(tfin)) | (tfin==0) tfin[ind_bad] = np.median(tfin[~ind_bad]) # Create ideal pixel ramps in e- ramp = well_depth * tarr.reshape([-1,1,1]) / tfin.reshape([1,ny,nx]) ramp[ramp>well_depth] = well_depth # Reshape to put all pixels in single dimension data_flat = data.reshape([data.shape[0], -1]) ramp_flat = ramp.reshape([ramp.shape[0], -1]) gain_flat = ramp_flat / data_flat mask99_flat = mask99.reshape([mask99.shape[0], -1]) if counts_cut is None: cf_arr = np.zeros([deg+1,nx*ny]) else: cf_arr1 = np.zeros([deg+1,nx*ny]) cf_arr2 = np.zeros([deg+1,nx*ny]) for i in trange(nx*ny, leave=False, desc='Linearity Fitting'): # Grab values less than well depth ind = mask99_flat[:,i] indz = np.where(ind==False)[0] if len(indz)>0: ind[indz[0]] = True # Set next element true pix_dn = data_flat[:,i] pix_e = ramp_flat[:,i] gain = gain_flat[:,i] # Linearity or non-linearity coefficients vals = pix_e if nonlin else pix_dn if counts_cut is None: cf_arr[:,i] = jl_poly_fit(vals[ind], gain[ind], deg=deg, use_legendre=use_legendre, lxmap=lxmap) else: # Fit high pixel values ifit1 = (pix_dn >= counts_cut) if ifit1.sum() > 0: cf_arr1[:,i] = jl_poly_fit(vals[ifit1], gain[ifit1], deg=deg, use_legendre=use_legendre, lxmap=lxmap) # Fit low pixel values ifit2 = ~ifit1 if ifit2.sum() > 0: cf_arr2[:,i] = jl_poly_fit(vals[ifit2], gain[ifit2], deg=deg, use_legendre=use_legendre, lxmap=lxmap) # Reshape and set reference masks to 0 if counts_cut is None: cf_arr = cf_arr.reshape([deg+1,ny,nx]) cf_arr[:,mask_ref] = 0 return cf_arr else: cf_arr1 = cf_arr1.reshape([deg+1,ny,nx]) cf_arr1[:,mask_ref] = 0 cf_arr2 = cf_arr2.reshape([deg+1,ny,nx]) cf_arr2[:,mask_ref] = 0 return cf_arr1, cf_arr2
[docs]def get_linear_coeffs(allfiles, super_bias=None, DMS=False, kppc=None, kipc=None, counts_cut=None, deg=8, use_legendre=True, lxmap=[0,1e5], return_satvals=False, nonlin=False, sat_calc=0.98, **kwargs): if super_bias is None: super_bias = 0 # Set logging to WARNING to suppress messages log_prev = conf.logging_level setup_logging('WARN', verbose=False) # Header info from first file hdr = fits.getheader(allfiles[0]) det = create_detops(hdr, DMS=DMS) # Set back to previous logging level setup_logging(log_prev, verbose=False) # Well level in electrons well_depth = det.well_level data_mn, _ = gen_super_ramp(allfiles, super_bias=super_bias, DMS=DMS, **kwargs) # Update number of read frames # nz, ny, nx = data_mn.shape # det.multiaccum.ngroup = nz # tarr = det.times_group_avg # IPC and PPC kernels # PPC corrections if (kppc is not None) and (kppc[1,2]>0): data_mn = ppc_deconvolve(data_mn, kppc) # IPC correction if kipc is not None: data_mn = ipc_deconvolve(data_mn, kipc) # Get saturation levels sat_vals = find_sat(data_mn) # Get coefficients to obtain non-linear ramp res = calc_linearity_coeff(data_mn, sat_vals, well_depth, deg=deg, counts_cut=counts_cut, use_legendre=use_legendre, lxmap=lxmap, nonlin=nonlin, sat_calc=sat_calc) if return_satvals: return res, sat_vals else: return res
[docs]def pixel_linearity_gains(frame, coeff_arr, use_legendre=True, lxmap=[0,1e5]): """ Given some image data and coefficient """ # from numpy.polynomial import legendre from scipy.special import eval_legendre ncf = coeff_arr.shape[0] xvals = frame.reshape([1,-1]) if use_legendre: # Values to map to [-1,+1] if lxmap is None: lxmap = [np.min(xvals), np.max(xvals)] # Remap xvals -> lxvals dx = lxmap[1] - lxmap[0] lxvals = 2 * (xvals - (lxmap[0] + dx/2)) / dx xfan = np.array([eval_legendre(n, lxvals) for n in range(ncf)]) else: # Create an array of exponent values parr = np.arange(ncf, dtype='float') xfan = xvals**parr.reshape([-1,1]) # Array broadcasting gain = np.sum(xfan.reshape([ncf,-1]) * coeff_arr.reshape([ncf,-1]), axis=0) return gain
[docs]def apply_linearity(cube, det, coeff_dict): """Apply pixel linearity corrections to ramp Linearize a bias-subtracted, ref-pixel-corrected ramp and convert from units of DN to electrons. Parameters ---------- cube : ndarray Ramp data in DN of size (nz,ny,nx). Should be bias-subtracted and ref-pixel-corrected. Should match det subarray shape. det : Detector Class NIRCam detector class. coeff_dict : ndarray Dictionary holding coefficient information: - 'cf_nonlin' : Set of polynomial coefficients of size (ncf,ny,nx). - 'use_legendre' : Coefficients use Legendre polynomials? - 'lxmap' : Legendre polynomial normalization range, usually [0,1e5] Possible to separately fit lower flux values: - 'counts_cut' : Flux cut-off value in electrons - 'cf_nonlin_low' : Coefficients for flux values below counts_cut """ nz, _, _ = cube.shape nx, ny = (det.xpix, det.ypix) # Need to crop input coefficients in the event of subarrays x1, x2 = (det.x0, det.x0 + nx) y1, y2 = (det.y0, det.y0 + ny) if cube.shape[-2]!=ny or cube.shape[-1]!=nx: # Assume full frame cube needs to be cropped cube = cube[:,y1:y2,x1:x2] # Nominal coefficient array cf_arr = coeff_dict.get('cf_nonlin')[:,y1:y2,x1:x2] use_legendre = coeff_dict.get('use_legendre', False) lxmap = coeff_dict.get('lxmap') # Information for lower flux values counts_cut = coeff_dict.get('counts_cut') if counts_cut is None: cf_low = None else: cf_low = coeff_dict.get('cf_nonlin_low')[:,y1:y2,x1:x2] res = np.zeros_like(cube) for i in trange(nz, desc='Linearity', leave=False): frame = cube[i] if counts_cut is None: gain = pixel_linearity_gains(frame, cf_arr, use_legendre=use_legendre, lxmap=lxmap) else: ind1 = (frame >= counts_cut) ind2 = ~ind1 gain = np.zeros_like(frame) if ind1.sum()>0: # Upper values gain[ind1] = pixel_linearity_gains(frame[ind1], cf_arr[:,ind1], use_legendre=use_legendre, lxmap=lxmap) if ind2.sum()>0: # Lower values gain[ind2] = pixel_linearity_gains(frame[ind2], cf_low[:,ind2], use_legendre=use_legendre, lxmap=lxmap) gain = gain.reshape([ny,nx]) # Convert from DN to electrons res[i,:] = frame * gain del gain # For reference pixels, copy frame data and multiple by detector gain mask_ref = det.mask_ref res[i,mask_ref] = frame[mask_ref] * det.gain return res
[docs]def apply_nonlin(cube, det, coeff_dict, randomize=True, rand_seed=None): """Apply pixel non-linearity to ideal ramp Given a simulated cube of data in electrons, apply non-linearity coefficients to obtain values in DN (ADU). This Parameters ---------- cube : ndarray Simulated ramp data in e-. These should be intrinsic flux values with Poisson noise, but prior to read noise, kTC, IPC, etc. Size (nz,ny,nx). Should match det subarray shape. det : Detector Class Desired detector class output coeff_dict : ndarray Dictionary holding coefficient information: - 'cf_nonlin' : Set of polynomial coefficients of size (ncf,ny,nx). - 'use_legendre' : Coefficients use Legendre polynomials? - 'lxmap' : Legendre polynomial normalization range, usually [0,1e5] - 'sat_vals' : An image indicating what saturation levels in DN for each pixel Possible to separately fit lower flux values: - 'counts_cut' : Flux cut-off value in electrons - 'cf_nonlin_low' : Coefficients for flux values below counts_cut To include randomization in line with observed variation: - 'cflin0_mean' : Average 0th-order coefficient - 'cflin0_std' : Measured standard deviation of 0th-order coefficent - 'corr_slope' : Slope of linear correlation between 0th-order and higher orders - 'corr_intercept' : Intercept of linear Correaltion between 0th-order and higher orders Keyword Args ------------ randomize : bool Add variation to the non-linearity coefficients """ rng = np.random.default_rng(rand_seed) nz, _, _ = cube.shape nx, ny = (det.xpix, det.ypix) # Need to crop input coefficients in the event of subarrays x1, x2 = (det.x0, det.x0 + nx) y1, y2 = (det.y0, det.y0 + ny) if cube.shape[-2]!=ny or cube.shape[-1]!=nx: # Assume full frame cube needs to be cropped cube = cube[:,y1:y2,x1:x2] # Nominal coefficient array cf_arr = coeff_dict.get('cf_nonlin')[:,y1:y2,x1:x2] use_legendre = coeff_dict.get('use_legendre', False) lxmap = coeff_dict.get('lxmap') # Mean and standard deviation of first coefficients cflin0_mean = coeff_dict.get('cflin0_mean', cf_arr[0])[y1:y2,x1:x2] cflin0_std = coeff_dict.get('cflin0_std')[y1:y2,x1:x2] # The rest of the coefficents have a direct correlation to the first corr_slope = coeff_dict.get('corr_slope')[:,y1:y2,x1:x2] corr_intercept = coeff_dict.get('corr_intercept')[:,y1:y2,x1:x2] # Information for lower flux values counts_cut = coeff_dict.get('counts_cut') if counts_cut is None: cf_low = None else: cf_low = coeff_dict.get('cf_nonlin_low')[:,y1:y2,x1:x2] sat_vals = coeff_dict.get('sat_vals')[y1:y2,x1:x2] # Saturation in DN well_depth = det.well_level # Full well in e- corresponding to sat in DN if randomize: cf0_rand = rng.normal(loc=cflin0_mean, scale=cflin0_std) cf_arr = np.concatenate(([cf0_rand], corr_slope * cf0_rand + corr_intercept)) res = np.zeros_like(cube) for i in trange(nz, desc='Non-Linearity', leave=False): frame = cube[i] # Values higher than well depth ind_high = frame > well_depth if counts_cut is None: gain = pixel_linearity_gains(frame, cf_arr, use_legendre=use_legendre, lxmap=lxmap) else: ind1 = (frame >= counts_cut) ind2 = ~ind1 gain = np.zeros_like(frame) if ind1.sum()>0: # Upper values gain[ind1] = pixel_linearity_gains(frame[ind1], cf_arr[:,ind1], use_legendre=use_legendre, lxmap=lxmap) if ind2.sum()>0: # Lower values gain[ind2] = pixel_linearity_gains(frame[ind2], cf_low[:,ind2], use_legendre=use_legendre, lxmap=lxmap) gain = gain.reshape([ny,nx]) # Avoid NaNs igood = gain!=0 # Convert from electrons to ADU res[i,igood] = frame[igood] / gain[igood] del gain # Correct any pixels that are above saturation DN ind_over = (res[i]>sat_vals) | ind_high res[i,ind_over] = sat_vals[ind_over] # For reference pixels, copy frame data and divide by detector gain # Normally reference pixels should start as 0s, but just in case... mask_ref = det.mask_ref res[i,mask_ref] = frame[mask_ref] / det.gain return res
[docs]def get_flat_fields(im_slope, split_low_high=True, smth_sig=10, ref_info=[4,4,4,4]): """ Calculate QE variations in flat field""" from astropy.convolution import convolve_fft, Gaussian2DKernel ny, nx = im_slope.shape # Crop out active pixel region lower, upper, left, right = ref_info iy1, iy2 = (lower, ny - upper) ix1, ix2 = (left, nx - right) im_act = im_slope[iy1:iy2,ix1:ix2] # Assuming a uniformly illuminated field, get fractional QE variations qe_frac = im_act / np.median(im_act) ### Outlier removal # Perform a quick median filter imarr = [] xysh = 3 for xsh in np.arange(-xysh, xysh): for ysh in np.arange(-xysh, xysh): if not xsh==ysh==0: im_shift = fshift(qe_frac, delx=xsh, dely=ysh, pad=True, cval=1) imarr.append(im_shift) imarr = np.asarray(imarr) im_med = np.median(imarr, axis=0) del imarr # Replace outliers with their median values diff = qe_frac - im_med mask_good = robust.mean(diff, return_mask=True) mask_bad = ~mask_good qe_frac[mask_bad] = im_med[mask_bad] if split_low_high: # Perform a Gaussian smooth to get low frequency flat field info kernel = Gaussian2DKernel(smth_sig) qe_frac_pad = np.pad(qe_frac, pad_width=100, mode='reflect') im_smth = convolve_fft(qe_frac_pad, kernel, allow_huge=True, boundary='fill') lflats = pad_or_cut_to_size(im_smth, qe_frac.shape) pflats = qe_frac / lflats # Set QE variations of ref pixels to 1 (fill_val=1) lflats = pad_or_cut_to_size(lflats, (ny,nx), fill_val=1) pflats = pad_or_cut_to_size(pflats, (ny,nx), fill_val=1) return lflats, pflats else: return pad_or_cut_to_size(qe_frac, (ny,nx), fill_val=1)