Source code for pynrc.simul.ngNRC

"""
ngNRC - NIRCam Detector Noise Simulator

Modification History:

Nov 2021
    - Auto-generate source tables from Gaia DR2 or Simbad queries
    - Add WFE drifts.
Oct 2021
    - Use numpy random number generator objects to produce repeatable results.
Sept 2021
    - Major refactor, splitting out slope_to_level1b and slope_to_fitswriter
    - Added linearity, flat fields, and cosmic rays
Apr 2021
    - Deprecate nghxrg, SCANoise, and slope_to_ramp
    - Instead use slope_to_ramps
Oct 2020
    - Restructure det noise and ramp creation
    - DMS simulations using JWST pipeline data models
Feb 2017
    - Add ngNRC to pyNRC code base
Aug 2016, J.M. Leisenring, UA/Steward
    - Modified how the detector and multiaccum info is handled
    - Copied detector and multiaccum classes from pyNRC
    - In the future, we will want to integrate this directly
      so that any changes made in the pyNRC classes are accounted.
July 2016, J.M. Leisenring, UA/Steward
    - Updated many things and more for nghxrg (v3.0)
Feb 2016, J.M. Leisenring, UA/Steward
    - First Release
"""
import json
import numpy as np
import os

from copy import deepcopy

from astropy.io import fits
from astropy.convolution import convolve

from datetime import datetime

from pynrc.logging_utils import setup_logging

from .dms import create_DMS_HDUList, level1b_data_model
from .dms import update_dms_headers, update_headers_pynrc_info
from .apt import DMS_input, gen_jwst_pointing
from ..nrc_utils import pad_or_cut_to_size, frebin, jl_poly, get_detname
from ..nrc_utils import gen_unconvolved_point_source_image
from ..reduce.calib import ramp_resample, nircam_cal
from ..maths.coords import det_to_sci, sci_to_det
from ..maths.image_manip import convolve_image
from .. import conf

from stdatamodels import fits_support
import astropy.units as u

# Program bar
from tqdm.auto import trange, tqdm

import logging
_log = logging.getLogger('pynrc')

[docs]def create_level1b_FITS(sim_config, detname=None, apname=None, filter=None, visit_id=None, dry_run=None, save_slope=None, save_dms=None): """ Generate Level1b DMS-like FITS files. TODO: - Tracking image persistence - Static column noise that shifts every integration - Currently only has WFE drifts of coronagraphic on-mask stars (sci and ref) - Also drifts non-HCI observations, generating new PSF grids each exposure Keyword Args ============ detname : None or int or str Option to supply a valid detector name. If set, only the currently specified SCA will be simulated. apname : None or str Similar to `detname` keyword, can supply a specific SIAF aperture name that exists within the observation. Only that aperture will be simulated. filter : None or str Specify a filter within observation to be simulated. Can be combined with `detname` and `apname` keywords. Should have the form "ABC:XYZ", or "ObsNum:VisitNum" (e.g., "005:001" for observation 5, visit 1). visit_id : None or str Specify the visit ID to simulate. dry_run : bool or None Won't generate any image data, but instead runs through each observation, printing detector info, SIAF aperture name, filter, visit IDs, exposure numbers, and dither information. If set to None, then grabs keyword from `sim_config`, otherwise defaults to False if not found. If paired with `save_dms`, then will generate an empty set of DMS FITS files with headers populated, but data set to all zeros. save_slope : bool or None Saves noiseless slope images to a separate DMS-like FITS file that is names 'slope_{DMSfilename}'. If set to None, then grabs keyword from `sim_config`, otherwise defaults to False if not found. No effect if dry_run=True. save_dms : bool or None Option to disable simulation of ramp data and creation of DMS FITS. If dry_run=True, then setting save_dms=True will save DMS FITS files populated with all zeros. If set to None, then grabs keyword from `sim_config`; if no keyword is found, then defaults to True if dry_run=False, otherwise False. """ from ..pynrc_core import DetectorOps, NIRCam from ..obs_nircam import obs_hci # Files and output directory json_file = sim_config['json_file'] sm_acct_file = sim_config['sm_acct_file'] pointing_file = sim_config['pointing_file'] xml_file = sim_config['xml_file'] save_dir = sim_config['save_dir'] # Source information for full field observations params_targets = sim_config['params_targets'] # PSF information kwargs_nrc = sim_config['params_webbpsf'] kwargs_psf = sim_config['params_psfconv'] kwargs_wfedrift = sim_config.get('params_wfedrift') large_grid = sim_config['large_grid'] # Ramp noise simulation options kwargs_det = sim_config['params_noise'] large_slew_uncert = sim_config['large_slew'] ta_sam_uncert = sim_config['ta_sam'] std_sam_uncert = sim_config['std_sam'] sgd_sam_uncert = sim_config['sgd_sam'] # Random seed information rand_seed_init = sim_config.get('rand_seed_init') if save_slope is None: save_slope = sim_config.get('save_slope', False) if dry_run is None: dry_run = sim_config.get('dry_run', False) # Saving DMS data is off by default if running dry run, # but keyword settings will take precedence. save_dms_def = False if dry_run else True if save_dms is None: save_dms = sim_config.get('save_dms', save_dms_def) ################################################# # Create DMS Input class obs_input = DMS_input(xml_file, pointing_file, json_file, sm_acct_file, save_dir=save_dir, rand_seed_init=rand_seed_init) # Update observing start date/time and V3 PA obs_input.obs_date = sim_config['obs_date'] obs_input.obs_time = sim_config['obs_time'] obs_input.pa_v3 = sim_config['pa_v3'] # Generate all observation parameters for every visit, exposure, detector, etc obs_params_all = obs_input.gen_all_obs_params() obs_params_all = np.asarray(obs_params_all) obs_detnames = np.array([get_detname(par['detector']) for par in obs_params_all]) obs_filters = np.array([par['filter'] for par in obs_params_all]) obs_apnames = np.array([par['siaf_ap'].AperName for par in obs_params_all]) obs_targets = np.array([par['target_name'] for par in obs_params_all]) obs_visitids = np.array([par['visit_key'] for par in obs_params_all]) # Unique labels for sorting obs_labels = np.array([f'{a}_{f}_{t}' for a, f, t in zip(obs_apnames, obs_filters, obs_targets)]) # WFE Drift information if kwargs_wfedrift is None: wfe_dict = None elif kwargs_wfedrift.get('wfe_dict') is not None: # wfe_dict already exists in passed parameters wfe_dict = kwargs_wfedrift.get('wfe_dict') else: plot_fig = kwargs_wfedrift.get('plot', False) figname = kwargs_wfedrift.get('figname', None) # Update figure save location if plot_fig and figname is not None: fileout = os.path.basename(figname) figpath = os.path.join(save_dir, fileout) kwargs_wfedrift['figname'] = figpath # Random seed info rand_seed_dwfe = kwargs_wfedrift.get('rand_seed') if rand_seed_dwfe is None: rng = np.random.default_rng(rand_seed_init) rand_seed_dwfe = rng.integers(0, 2**32-1) kwargs_wfedrift['rand_seed'] = rand_seed_dwfe wfe_dict = gen_wfe_drift(obs_input, **kwargs_wfedrift) # Select only a specific detector to generate simulations? if detname is None: udetnames = np.unique(obs_detnames) else: udetnames =[get_detname(detname)] # udetnames = ['NRCA5'] for detname in udetnames: # print('detname: ', detname) ind = (obs_detnames == detname) if ind.sum()==0: _log.warn(f'Detector {detname} is not a valid detector for these observations.') continue # Create calibration object det = DetectorOps(detector=detname) caldir = os.path.join(conf.PYNRC_PATH, 'calib', str(det.scaid)) if (not dry_run) and save_dms: cal_obj = nircam_cal(det.scaid, caldir, verbose=False) ulabels= np.unique(obs_labels[ind]) if (apname is not None) and (filter is not None): ind_mask1 = np.array([apname in a for a in ulabels]) ind_mask2 = np.array([filter in a for a in ulabels]) ind_mask = ind_mask1 & ind_mask2 ulabels = ulabels[ind_mask] log_print = _log.info elif (apname is not None) and (filter is None): ind_mask = np.array([apname in a for a in ulabels]) ulabels = ulabels[ind_mask] log_print = _log.info elif (apname is None) and (filter is not None): ind_mask = np.array([filter in a for a in ulabels]) ulabels = ulabels[ind_mask] log_print = _log.info else: log_print = _log.warn if len(ulabels)==0: log_print('No valid observations for specified parameters:') log_print(f' SCA: {detname}, SIAF: {apname}, Filter: {filter}') continue # ulabels = ['NRCA5_FULL_F277W'] for label in ulabels: aname = '_'.join(label.split('_')[0:-2]) fname = label.split('_')[-2] tname = label.split('_')[-1] # print(' label: ', label) ind2 = (obs_labels == label) if ind2.sum()==0: _log.warn(f'Skipping {aname} + {fname} + {tname} for {detname}...') continue # Select visit ids that have current obs config # Make sure we're not sorting _, uind = np.unique(obs_visitids[ind2], return_index=True) visit_ids = obs_visitids[ind2][np.sort(uind)] if visit_id is not None: ind_mask = (visit_ids == visit_id) visit_ids = visit_ids[ind_mask] log_print = _log.info else: log_print = _log.warn if len(visit_ids)==0: log_print('No valid Visit IDs for specified parameters:') log_print(f' SCA: {detname}, SIAF: {apname}, Filter: {filter}, Visit: {visit_id}') continue # Grab a set of obs parameters to create some key info op_temp = obs_params_all[ind2][0] siaf_ap = op_temp['siaf_ap'] # Check fov_pix size makes sense for aperture size # Reduce if too large. Make odd. kwargs_nrc2 = kwargs_nrc.copy() fov_pix = kwargs_nrc.get('fov_pix') if fov_pix is None: fov_pix = 641 if op_temp['channel'].upper()=='SHORT' else 321 fov_pix = np.min([2*siaf_ap.XSciSize, 2*siaf_ap.YSciSize, fov_pix]) fov_pix = fov_pix+1 if (fov_pix % 2)==0 else fov_pix kwargs_nrc2['fov_pix'] = fov_pix # Get target information target_id = op_temp['target_name'] try: target_info = params_targets[target_id] except KeyError: _log.error(f"Cannot find {target_id} target! Skipping {aname} + {fname} for {detname}...") continue # More target info src_tbl = target_info.get('src_tbl') star_info = target_info.get('params_star') sp_star = None if star_info is None else star_info.get('sp') dist_pc = target_info.get('dist_pc') age_Myr = target_info.get('age_Myr') planet_params = target_info.get('params_companions') disk_params = target_info.get('params_disk_model') # Gather info to create NIRCam instrument object filt = op_temp['filter'] pupil = None if op_temp['pupil']=='CLEAR' else op_temp['pupil'] mask = None if op_temp['coron_mask']=='None' else op_temp['coron_mask'] ap_obs_name = siaf_ap.AperName ap_nrc_name = ap_obs_name # Is this a high-contrast imaging observation? is_hci = ('MASK' in ap_nrc_name) or (sp_star is not None) if not dry_run: # Get rid of previous instances try: del nrc except: pass if is_hci: nrc = obs_hci(sp_star, dist_pc, filter=filt, apname=ap_nrc_name, use_ap_info=True, disk_params=disk_params, autogen_coeffs=False, detector=detname, **kwargs_nrc2) nrc.gen_psf_coeff() nrc.gen_wfemask_coeff(large_grid=large_grid) if wfe_dict is not None: nrc.gen_wfedrift_coeff() # Create grid of PSFs if (disk_params is not None) or (src_tbl is not None): nrc.gen_disk_psfs(force=True) hdul_psfs = nrc.psf_list # Add planets if planet_params is not None: for kpl in planet_params: if age_Myr is None: _log.warn('Target age is not set. Assuming 100 Myr.') age = 100 else: age = age_Myr kw = planet_params[kpl] nrc.add_planet(age=age, **kw) else: nrc = NIRCam(filter=filt, pupil_mask=pupil, image_mask=mask, detector=detname, apname=ap_nrc_name, autogen_coeffs=False, **kwargs_nrc2) nrc.gen_psf_coeff() nrc.gen_wfefield_coeff() # nrc.gen_wfemask_coeff(large_grid=large_grid) if wfe_dict is not None: nrc.gen_wfedrift_coeff() # Create grid of PSFs # Skip if doing WFE drifts over time, since regenerated # for each exposure. if (src_tbl is not None) and (wfe_dict is None): hdul_psfs = nrc.gen_psfs_over_fov(**kwargs_psf) else: hdul_psfs = None # Get Zodiacal background emission. ra_ref, dec_ref = (op_temp['ra_ref'], op_temp['dec_ref']) date_str = op_temp['date-obs'] date_arg = (int(s) for s in date_str.split('-')) day_of_year = datetime(*date_arg).timetuple().tm_yday im_bg = nrc.bg_zodi_image(ra=ra_ref, dec=dec_ref, thisday=day_of_year) # Cycle through each of the visits for vid in visit_ids: # print(' vid: ', vid) visit_dict = obs_input.program_info[vid] # Get exposure IDs obs_dict_arr = visit_dict['obs_id_info'] exp_ids = np.array([int(d['exposure_number']) for d in obs_dict_arr]) grp_ids = np.array([int(d['visit_group']) for d in obs_dict_arr]) seq_ids = np.array([int(d['sequence_id']) for d in obs_dict_arr]) act_ids = np.array([ d['activity_id'] for d in obs_dict_arr]) # Get type of observations (T-ACQ, CONFIRM, SCIENCE, ETC?) type_arr = visit_dict['type'] type_vals = np.array(['T_ACQ', 'CONFIRM', 'SCIENCE']) # Cycle through each exposure in visit nexp = len(exp_ids) # Total number of exposures with this aperture name for j in trange(nexp, desc="Exposures", leave=False): # Create the observation parameters dictionary for selected exposure exp_num = exp_ids[j] grp_id = grp_ids[j] seq_id = seq_ids[j] act_id = act_ids[j] act_int = np.int(act_id, 36) # Convert base 36 to integer number # print(' exp_num: ', exp_num) obs_params = obs_input.gen_obs_params(vid, exp_num, detname, grp_id=grp_id, seq_id=seq_id, act_id=act_id) # Update some target info obs_params['catalog_name'] = target_info.get('TargetArchiveName', 'UNKNOWN') if 'RAProperMotion' in obs_params.keys(): obs_params['mu_RA'] = target_info['RAProperMotion'] if 'DecProperMotion' in obs_params.keys(): obs_params['mu_DEC'] = target_info['DecProperMotion'] # Random seed for noise uncertainties; each seed should be unique but also repeatable rand_seed_noise_j = visit_dict['rand_seed_noise'] + grp_id*act_int*nexp + exp_num # Random seed for dither uncertainties tup = obs_params['visit_type'].upper() #type_arr[j].upper() lup = obs_params['visit_level'].upper() ddist = obs_params['ddist'] if tup not in type_vals: _log.warn(f'Exposure type {tup} not recognized.') _log.warn(f' Visit {vid}, Exp {exp_num}, Grp {grp_id}, Seq {seq_id}, Act {act_id}') continue if tup=='T_ACQ': rand_seed_base = rand_seed_dith = visit_dict['rand_seed_dith'] base_std = large_slew_uncert dith_std = 0 elif (tup=='CONFIRM') and (ddist==0): rand_seed_base = rand_seed_dith= visit_dict['rand_seed_dith'] base_std = large_slew_uncert dith_std = 0 elif (tup=='CONFIRM'): rand_seed_base = rand_seed_dith= visit_dict['rand_seed_dith'] + 1 base_std = ta_sam_uncert dith_std = 0 elif (tup=='SCIENCE') and (lup=='TARGET'): rand_seed_base = visit_dict['rand_seed_dith'] + 1 rand_seed_dith = rand_seed_base + 1 base_std = large_slew_uncert if type_arr[0].upper()=='SCIENCE' else ta_sam_uncert elif (tup=='SCIENCE') and (lup=='FILTER'): rand_seed_base = visit_dict['rand_seed_dith'] + 1 if ddist==0: rand_seed_dith = rand_seed_base + 1 else: rand_seed_dith = rand_seed_base + 1 + grp_id*act_int*nexp + exp_num base_std = large_slew_uncert if type_arr[0].upper()=='SCIENCE' else ta_sam_uncert elif (tup=='SCIENCE'): pass # No need to do anything, since no new activity else: raise ValueError(f'Not sure what to do with {lup}, {tup}, dDist={ddist:.3f}') # New science pointing once per science activity ra_ref, dec_ref = (obs_params['ra_ref'], obs_params['dec_ref']) if tup=='T_ACQ' or tup=='CONFIRM': tel_pointing = gen_jwst_pointing(visit_dict, obs_params, base_std=base_std, dith_std=dith_std, rand_seed=rand_seed_dith, rand_seed_base=rand_seed_base) tel_pointing.exp_nums = np.array([exp_num]) elif (tup=='SCIENCE') and (int(exp_num)==1): # Make sure first_dith_zero = True if (lup=='TARGET' or ddist==0) else False # Create jwst_pointing class tel_pointing = gen_jwst_pointing(visit_dict, obs_params, base_std=base_std) # Update standard and SGD uncertainties and regenerage random pointings tel_pointing._std_sig = std_sam_uncert tel_pointing._sgd_sig = sgd_sam_uncert tel_pointing.gen_random_offsets(rand_seed=rand_seed_dith, rand_seed_base=rand_seed_base, first_dith_zero=first_dith_zero) tel_pointing.exp_nums = np.arange(tel_pointing.ndith) + 1 # Save random seed in obs_params obs_params['rand_seed_init'] = rand_seed_init obs_params['rand_seed_dith'] = rand_seed_dith obs_params['rand_seed_noise'] = rand_seed_noise_j obs_params['rand_seed_dwfe'] = rand_seed_dwfe # Skip slope creation if obs_label doesn't match NIRCam class a = obs_params['siaf_ap'].AperName f = obs_params['filter'] t = obs_params['target_name'] if label != f'{a}_{f}_{t}': continue # Create dictionary of parameters to save to FITS header kwargs_pynrc = kwargs_det.copy() kwargs_pynrc['json_file'] = os.path.basename(json_file) kwargs_pynrc['sm_acct_file'] = os.path.basename(sm_acct_file) kwargs_pynrc['pointing_file'] = os.path.basename(pointing_file) kwargs_pynrc['xml_file'] = os.path.basename(xml_file) # Save simulated offset positions incl. random mispointings expnum = int(obs_params['obs_id_info']['exposure_number']) ind = np.where(tel_pointing.exp_nums == expnum)[0][0] idl_off = tel_pointing.position_offsets_act[ind] kwargs_pynrc['xoffset_act'] = idl_off[0] kwargs_pynrc['yoffset_act'] = idl_off[1] kwargs_pynrc['pa_v3'] = obs_params['pa_v3'] # WFE drift values if wfe_dict is not None: tval_exp = obs_params['texp_start_relative'] # seconds tval_all = wfe_dict['time_sec'] wfe_total = wfe_dict['total'] wfe_drift_exp = np.interp(tval_exp, tval_all, wfe_total) else: wfe_drift_exp = 0 obs_params['wfe_drift'] = wfe_drift_exp if dry_run: idl_off_str = f'({idl_off[0]:+0.3f}, {idl_off[1]:+0.3f})' gsa_str = f'{grp_id:02d}{seq_id:01d}{act_id}' dwfe = f'{wfe_drift_exp:.2f}' print(detname, a, f, t, vid, gsa_str, exp_num, idl_off_str, dwfe, tval_exp) # Save Level1b DMS FITS files without any data if save_dms: obs_params['filename'] = 'empty_' + obs_params['filename'] create_DMS_HDUList(None, None, obs_params, save_dir=save_dir, **kwargs_pynrc) else: # Generate dithered slope image for given exposure ID # Start with background # im_bg was initially created above for this # aperture, filter, target but the date may have changed ra_ref, dec_ref = (obs_params['ra_ref'], obs_params['dec_ref']) date_str = obs_params['date-obs'] date_arg = (int(s) for s in date_str.split('-')) day_of_year2 = datetime(*date_arg).timetuple().tm_yday if day_of_year == day_of_year2: im_slope = im_bg.copy() else: day_of_year = day_of_year2 im_bg = nrc.bg_zodi_image(ra=ra_ref, dec=dec_ref, thisday=day_of_year) im_slope = im_bg.copy() if src_tbl is not None: # Create slope image from table # res = (src_tbl, nrc, obs_params, tel_pointing, hdul_psfs, wfe_drift_exp) # return res im_slope += sources_to_slope(src_tbl, nrc, obs_params, tel_pointing, hdul_psfs=hdul_psfs, im_bg=0, wfe_drift=wfe_drift_exp) # Create slope image from HCI observation # if isinstance(nrc, obs_hci): if is_hci: pa_v3 = obs_params['pa_v3'] im_slope += nrc.gen_slope_image(PA=pa_v3, xyoff_asec=idl_off, zfact=0, exclude_noise=True, wfe_drift0=wfe_drift_exp) # Save slope image if save_slope: save_slope_image(im_slope, obs_params, save_dir=save_dir, **kwargs_pynrc) # Simulate ramp and stuff into a level1b file if save_dms: obs_params['filename'] = 'pynrc_' + obs_params['filename'] slope_to_level1b(im_slope, obs_params, cal_obj=cal_obj, save_dir=save_dir, **kwargs_pynrc) # Get rid of previous NIRCam instances try: del nrc except: pass
[docs]def save_slope_image(im_slope, obs_params, save_dir=None, **kwargs): """Save slope image as Level1b-like FITS""" # Create data model for headers outModel = level1b_data_model(obs_params) hdul_temp, _ = fits_support.to_fits(outModel._instance, outModel._schema) # Create a new HDUList for the slope image hdu1 = fits.PrimaryHDU(header=hdul_temp[0].header.copy()) hdu2 = fits.ImageHDU(data=im_slope, header=hdul_temp[1].header.copy(strip=True)) hdul_slope = fits.HDUList([hdu1, hdu2]) # Save slope FITS file file_slope = 'slope_' + obs_params['filename'] if save_dir is not None: # Create directory and intermediates if they don't exist os.makedirs(save_dir, exist_ok=True) file_slope = os.path.join(save_dir, file_slope) # Save model to DMS FITS file print(f' Saving: {file_slope}') hdul_slope.writeto(file_slope, overwrite=True) hdul_temp.close() hdul_slope.close() # Update some WCS and segment info update_dms_headers(file_slope, obs_params) update_headers_pynrc_info(file_slope, obs_params, **kwargs)
[docs]def slope_to_level1b(im_slope, obs_params, cal_obj=None, save_dir=None, cframe='sci', out_ADU=True, **kwargs): """Simulate DMS HDUList from slope image Requires input of obs_params input dictionary as generated from APT input files (see `DMS_input` class in apt.py). Also, make sure the `calib` directory exists in PYNRC_PATH and is populated with detector calibration information. Look at keyword args to exclude specific detector effects. Output is saved to disk and will not be returned by the function. Parameters ========== im_slope : ndarray Slope in e-/sec of image from all sky sources, including Zodiacal background. Should exclude dark current background, which is handled separately from calib directory. obs_params : dict Dictionary of parameters to populate DMS header. See `create_DMS_HDUList` in dms.py. cal_obj : :class:`pynrc.nircam_cal` DMS object built from exported APT files. See `DMS_input` in apt.py. save_dir : None or str Option to override output directory as specified in `obs_params` dictionary. If not specified as either a function keyword or in `obs_params`, then files are saved in current working directory. cframe : str Coordinate frame of input slope, 'sci' or 'det'. Keyword Args ============ include_dark : bool Add dark current? include_bias : bool Add detector bias? include_ktc : bool Add kTC noise? include_rn : bool Add readout noise per frame? include_cpink : bool Add correlated 1/f noise to all amplifiers? include_upink : bool Add uncorrelated 1/f noise to each amplifier? include_acn : bool Add alternating column noise? apply_ipc : bool Include interpixel capacitance? apply_ppc : bool Apply post-pixel coupling to linear analog signal? include_refoffsets : bool Include reference offests between amplifiers and odd/even columns? include_refinst : bool Include reference/active pixel instabilities? include_colnoise : bool Add in column noise per integration? col_noise : ndarray or None Option to explicitly specify column noise distribution in order to shift by one for subsequent integrations amp_crosstalk : bool Crosstalk between amplifiers? add_crs : bool Add cosmic ray events? See Robberto et al 2010 (JWST-STScI-001928). cr_model: str Cosmic ray model to use: 'SUNMAX', 'SUNMIN', or 'FLARES'. cr_scale: float Scale factor for probabilities. latents : None Apply persistence. apply_nonlinearity : bool Apply non-linearity? random_nonlin : bool Add randomness to the linearity coefficients? prog_bar : bool Show a progress bar for this ramp generation? """ rseed_init = obs_params.get('rand_seed_noise') rng = np.random.default_rng(rseed_init) det = obs_params['det_obj'] nint = det.multiaccum.nint # Put into 'sci' coords if cframe=='det': im_slope = det_to_sci(im_slope) if cal_obj is None: caldir = os.path.join(conf.PYNRC_PATH, 'calib', str(det.scaid)) cal_obj = nircam_cal(det.scaid, caldir) # Simulate data for a single ramp sci_data = [] zero_data = [] for i in trange(nint, desc='Ramps', leave=False): # New random seed for noise generator initialization rand_seed = rng.integers(0, 2**32-1) res = simulate_detector_ramp(det, cal_obj, im_slope=im_slope, return_zero_frame=True, return_full_ramp=False, cframe='sci', out_ADU=out_ADU, rand_seed=rand_seed, **kwargs) # Append to lists sci_data.append(res[0]) zero_data.append(res[1]) # Convert to single np.array sci_data = np.asarray(sci_data) zero_data = np.asarray(zero_data) # Create and save Level 1b data model to disk # This also updates header information create_DMS_HDUList(sci_data, zero_data, obs_params, save_dir=save_dir, **kwargs)
[docs]def sources_to_slope(source_table, nircam_obj, obs_params, tel_pointing, hdul_psfs=None, im_bg=None, cframe_out='sci', **kwargs): """ Create a slope image from a table or sources Parameters ========== source_table : astropy Table Table of objects in across the region, including headers 'ra', 'dec', and object fluxes in NIRCam filter in vega mags where headers are labeled the filter name (e.g, 'F444W'). nircam_obj : :class:`pynrc.NIRCam` NIRCam instrument class for PSF generation. obs_params : dict Dictionary of parameters to populate DMS header. See `create_obs_params` in apt.py and `level1b_data_model` in dms.py. tel_pointing : :class:`webbpsf_ext.jwst_point` JWST telescope pointing information. Holds pointing coordinates and dither information for a given telescope visit. hdul_psfs : HDUList Option to pass a pre-generated HDUList of PSFs across the field of view. If set to None, then generated automatically. im_bg : None or ndarray Option to specify a pre-generated image (or single value) of the Zodiacal background emission. If not specified, then gets automatically generating. cframe_out : str Desired output coordinate frame, either 'sci' or 'det' Keyword Args ============ npsf_per_full_fov : int Number of PSFs across one dimension of the instrument's field of view. If a coronagraphic observation, then this is for the nominal coronagrahic field of view. sptype : str Spectral type, such as 'A0V' or 'K2III'. wfe_drift : float Desired WFE drift value relative to default OPD. osamp : int Sampling of output PSF relative to detector sampling. If `hdul_psfs` is specified, then the 'OSAMP' header keyword takes precedence. use_coeff : bool If True, uses `calc_psf_from_coeff`, other WebbPSF's built-in `calc_psf`. Coefficients are much faster """ nrc = nircam_obj siaf_ap = obs_params['siaf_ap'] det = obs_params['det_obj'] # Get oversampling if hdul_psfs is not None: # First check hdul_psfs osamp = hdul_psfs[0].header['OSAMP'] if ('osamp' in kwargs.keys()) and (kwargs['osamp']!=osamp): osamp2 = kwargs['osamp'] _log.warn(f'Conflict between osamp in kwargs ({osamp2}) and osamp in PSF header ({osamp}). Using header.') kwargs['osamp'] = osamp elif 'osamp' in kwargs.keys(): osamp = kwargs['osamp'] else: osamp = 1 kwargs['osamp'] = osamp ############################### # Generate unconvolved image # RA and Dec of all objects in field ra_deg, dec_deg = (source_table['ra'], source_table['dec']) # Vega magnitude values filt = obs_params['filter'] mags = source_table[filt].data expnum = int(obs_params['obs_id_info']['exposure_number']) hdul_sci_image = gen_unconvolved_point_source_image(nrc, tel_pointing, ra_deg, dec_deg, mags, expnum=expnum, **kwargs) ############################### # Convolve full image with PSFs # Perform convolution if nrc.is_coron: # For coronagraphy, assume position-dependent PSFs are a function # of coronagraphic mask transmission, off-axis, and on-axis PSFs. # PSF(x,y) = trans(x,y)*psf_off + (1-trans(x,y))*psf_on # Get transmission mask and trans = nrc.mask_images['OVERMASK'] scale = kwargs['osamp'] / nrc.oversample trans = frebin(trans, scale=scale, total=False) trans_oversized = np.ones_like(hdul_sci_image[0].data) x0, y0 = (hdul_sci_image[0].header['XSCI0'], hdul_sci_image[0].header['YSCI0']) x1 = int(np.abs(x0*kwargs['osamp'])) y1 = int(np.abs(y0*kwargs['osamp'])) x2, y2 = (x1 + trans.shape[1], y1 + trans.shape[0]) trans_oversized[y1:y2,x1:x2] = trans # Off-axis component hdul_sci_image_off = deepcopy(hdul_sci_image) hdul_sci_image_off[0].data = hdul_sci_image_off[0].data * trans_oversized psf_off = nrc.calc_psf_from_coeff(use_bg_psf=True, return_oversample=True, return_hdul=True) hdul_conv_off = convolve_image(hdul_sci_image_off, psf_off, output_sampling=1, return_hdul=True) # On-axis component (closest PSF convolution, only for bar mask) hdul_sci_image_on = deepcopy(hdul_sci_image) hdul_sci_image_on[0].data = hdul_sci_image_on[0].data * (1 - trans_oversized) hdul_psfs = nrc.psf_list if hdul_psfs is None else hdul_psfs hdul_conv_on = convolve_image(hdul_sci_image_on, hdul_psfs, output_sampling=1, return_hdul=True) hdul_sci_conv = hdul_conv_on hdul_sci_conv[0].data += hdul_conv_off[0].data else: if hdul_psfs is None: hdul_psfs = nrc.gen_psfs_over_fov(return_coords=None, **kwargs) hdul_sci_conv = convolve_image(hdul_sci_image, hdul_psfs, output_sampling=1, return_hdul=True) im_conv = hdul_sci_conv[0].data ny, nx = im_conv.shape xsci = np.arange(nx) + hdul_sci_conv[0].header['XSCI0'] ysci = np.arange(ny) + hdul_sci_conv[0].header['YSCI0'] # Crop out relevant region xind = (xsci>=0) & (xsci<siaf_ap.XSciSize) yind = (ysci>=0) & (ysci<siaf_ap.YSciSize) im_slope = im_conv[yind,:][:,xind] ############################### # Add zodiacal background # Get Zodiacal background emission. # Can be reused for all ints in same observation. if im_bg is None: date_str = obs_params['date-obs'] date_arg = (int(s) for s in date_str.split('-')) day_of_year = datetime(*date_arg).timetuple().tm_yday ra, dec = tel_pointing.ap_radec() # ra, dec = (obs_params['ra_ref'], obs_params['dec_ref']) im_bg = nrc.bg_zodi_image(ra=ra, dec=dec, thisday=day_of_year) # Add background im_slope = im_slope + im_bg if cframe_out=='det': im_slope = sci_to_det(im_slope) return im_slope
[docs]def sources_to_level1b(source_table, nircam_obj, obs_params, tel_pointing, hdul_psfs=None, cal_obj=None, im_bg=None, out_ADU=True, save_dir=None, **kwargs): """Simulate DMS HDUList from slope image Requires input of obs_params input dictionary as generated from APT input files (see `DMS_input` class in apt.py). Also, make sure the `calib` directory exists in PYNRC_PATH and is populated with detector calibration information. Look at keyword args to exclude specific detector effects. Output is saved to disk and will not be returned by the function. Parameters ========== source_table : astropy Table Table of objects in across the region, including headers 'ra', 'dec', and object fluxes in NIRCam filter in vega mags where headers are labeled the filter name (e.g, 'F444W'). nircam_obj : :class:`pynrc.NIRCam` NIRCam instrument class for PSF generation. obs_params : dict Dictionary of parameters to populate DMS header. See `create_obs_params` in apt.py and `level1b_data_model` in dms.py. tel_pointing : :class:`webbpsf_ext.jwst_point` JWST telescope pointing information. Holds pointing coordinates and dither information for a given telescope visit. cal_obj : :class:`pynrc.nircam_cal` NIRCam calibration class that holds the necessary calibration info to simulate a ramp. im_bg : None or ndarray Option to specify a pre-generated image (or single value) of the Zodiacal background emission. If not specified, then gets automatically generating. save_dir : None or str Option to override output directory as specified in `obs_params` dictionary. If not specified as either a function keyword or in `obs_params`, then files are saved in current working directory. Keyword Args ============ npsf_per_full_fov : int Number of PSFs across one dimension of the instrument's field of view. If a coronagraphic observation, then this is for the nominal coronagrahic field of view. sptype : str Spectral type, such as 'A0V' or 'K2III'. wfe_drift : float Desired WFE drift value relative to default OPD. osamp : int Sampling of output PSF relative to detector sampling. If `hdul_psfs` is specified, then the 'OSAMP' header keyword takes precedence. use_coeff : bool If True, uses `calc_psf_from_coeff`, other WebbPSF's built-in `calc_psf`. Coefficients are much faster Ramp Gen Keywords ================= include_dark : bool Add dark current? include_bias : bool Add detector bias? include_ktc : bool Add kTC noise? include_rn : bool Add readout noise per frame? include_cpink : bool Add correlated 1/f noise to all amplifiers? include_upink : bool Add uncorrelated 1/f noise to each amplifier? include_acn : bool Add alternating column noise? apply_ipc : bool Include interpixel capacitance? apply_ppc : bool Apply post-pixel coupling to linear analog signal? include_refoffsets : bool Include reference offests between amplifiers and odd/even columns? include_refinst : bool Include reference/active pixel instabilities? include_colnoise : bool Add in column noise per integration? col_noise : ndarray or None Option to explicitly specify column noise distribution in order to shift by one for subsequent integrations amp_crosstalk : bool Crosstalk between amplifiers? add_crs : bool Add cosmic ray events? See Robberto et al 2010 (JWST-STScI-001928). cr_model: str Cosmic ray model to use: 'SUNMAX', 'SUNMIN', or 'FLARES'. cr_scale: float Scale factor for probabilities. latents : None Apply persistence. apply_nonlinearity : bool Apply non-linearity? random_nonlin : bool Add randomness to the linearity coefficients? prog_bar : bool Show a progress bar for this ramp generation? """ # Create a slope image in 'sci' coords im_slope = sources_to_slope(source_table, nircam_obj, obs_params, tel_pointing, hdul_psfs=hdul_psfs, im_bg=im_bg, **kwargs) kwargs['cframe'] = kwargs.get('cframe_out', 'sci') slope_to_level1b(im_slope, obs_params, cal_obj=cal_obj, save_dir=save_dir, out_ADU=out_ADU, **kwargs)
[docs]def slope_to_fitswriter(det, cal_obj, im_slope=None, cframe='det', filter=None, pupil=None, targ_name=None, obs_time=None, file_out=None, out_ADU=True, return_results=True, rand_seed=None, **kwargs): """Simulate HDUList from slope image FITSWriter-like output. DMS output has been depreceated and moved to `slope_to_level1b` and `sources_to_level1b`. Parameters ========== det : Detector Class Desired detector class output dark_cal_obj: nircam_cal class NIRCam calibration class that holds the necessary calibration info to simulate a ramp. im_slope : ndarray Input slope image of observed scene. Assumed to be in detector coordinates. If an image cube, then number of images must match the number of integration (`nint`) in `det` class. cframe : str Orientation of im_slope. Either 'det' or 'sci' coordinate frame. filter : str Name of filter element for header pupil : str Name of pupil element for header targ_name : str Target name (optional) obs_time : datetime Specifies when the observation was considered to be executed. If not specified, then it will choose the current time. This information is added to the header. Must be a datetime object: >>> datetime.datetime(2016, 5, 9, 11, 57, 5, 796686) file_out : str or None Name (including directory) to save FITS file. If None, then won't save; make sure to set return_results=True. out_ADU : bool If true, divide by gain and convert to 16-bit UINT. return_results : bool Return HDUList result? Otherwise, Keyword Args ============ return_full_ramp : bool By default, we average groups and drop frames as specified in the `det` input. If this keyword is set to True, then return all raw frames within the ramp. The last set of `nd2` frames will be omitted. include_dark : bool Add dark current? include_bias : bool Add detector bias? include_ktc : bool Add kTC noise? include_rn : bool Add readout noise per frame? include_cpink : bool Add correlated 1/f noise to all amplifiers? include_upink : bool Add uncorrelated 1/f noise to each amplifier? include_acn : bool Add alternating column noise? apply_ipc : bool Include interpixel capacitance? apply_ppc : bool Apply post-pixel coupling to linear analog signal? include_refinst : bool Include reference/active pixel instabilities? include_colnoise : bool Add in column noise per integration? col_noise : ndarray or None Option to explicitly specify column noise distribution in order to shift by one for subsequent integrations amp_crosstalk : bool Crosstalk between amplifiers? add_crs : bool Add cosmic ray events? latents : None Apply persistence. linearity_map : ndarray Add non-linearity. """ if kwargs.get('DMS') is not None: raise ValueError("DMS keyword is not valid. Instead, see `slope_to_level1b` or `sources_to_level1b`.") if (file_out is None) and (not return_results): raise ValueError("Set either file_out or return_results=True") # FITSWriter (ISIM format) if cframe=='sci': im_slope = sci_to_det(im_slope) data = simulate_detector_ramp(det, cal_obj, im_slope=im_slope, cframe='det', out_ADU=out_ADU, return_zero_frame=False, rand_seed=rand_seed, **kwargs) hdu = fits.PrimaryHDU(data) hdu.header = det.make_header(filter, pupil, obs_time, targ_name=targ_name, DMS=False) if file_out is not None: hdu.header['FILENAME'] = os.path.split(file_out)[1] outHDUList = fits.HDUList([hdu]) # Write file to disk if file_out is not None: outHDUList.writeto(file_out, overwrite='True') # Only return outHDUList if return_results=True if return_results: return outHDUList else: outHDUList.close()
[docs]def make_gaia_source_table(coords, remove_cen_star=True, radius=6*u.arcmin, teff_default=5800): """ Create source table from GAIA DR2 query Generates a table of objects by performing a cone search around a set of input coordinates. The output table includes both coordinates and extrapolated photometry. All returned coordinates are for epoch=2015.5 for GAIA DR2. The process involves performing a search query around a set of input coordinates, then using the GAIA magnitudes and Teff to extrapolate magnitudes in the NIRCam filters. If Teff isn't supplied, then it defaults to solar temperature. If an object's parallax isn't available, then the object is assumed to have a flat spectrum in F_lambda. Parameters ========== coords : SkyCoords Astropy SkyCoords object. remove_cen_star : bool Output will exclude the star associated with the input coordinates (anything with 0.1" is removed from table). radius : Units Radius to perfrom search. Default is 6', which should encompass NIRCam's full FoV, including both Modules A and B. teff_default : string Default stellar effective temperature to assume for sources without GAIA information to extrapolate to longer wavelengths. """ from astroquery.simbad import Simbad from astroquery.gaia import Gaia from astropy.table import Table from astropy.time import Time from .. import stellar_spectrum, read_filter from ..nrc_utils import S, bp_gaia Gaia.MAIN_GAIA_TABLE = "gaiadr2.gaia_source" Gaia.ROW_LIMIT = -1 try: # Convert to 2015.5 epoch of GAIA coordinates coord_query = coords.apply_space_motion(new_obstime=Time('J2015.5')) except: _log.warn(f'Unable to convert to J2015.5 epoch. Continuing with {coords.obstime}...') coord_query = coords gaia_tbl = Gaia.query_object_async(coord_query, radius=radius) # Remove items without any photometry ind_nomag = gaia_tbl['phot_g_mean_mag'].data.mask if remove_cen_star: dist_asec = gaia_tbl['dist'].data * 3600 ind_dist = dist_asec.data < 0.1 ind_remove = np.where(ind_nomag | ind_dist) else: ind_remove = np.where(ind_nomag) gaia_tbl.remove_rows(ind_remove) ra_name = 'ra' dec_name = 'dec' # Create new source table index = np.arange(len(gaia_tbl)) + 1 ra = gaia_tbl[ra_name] dec = gaia_tbl[dec_name] gband = gaia_tbl['phot_g_mean_mag'].data.data src_tbl = Table([index, ra, dec, gband], names=('index', 'ra', 'dec', 'g-band')) # Add distances src_tbl.add_column(gaia_tbl['dist'].data * 3600, name='dist') src_tbl['dist'].unit = 'arcsec' # Get effective temperature for each object rounded to the nearest 100K teff = (gaia_tbl['teff_val'].data.data / 100).astype('int') * 100 # Assume solar temperature for null data teff[gaia_tbl['teff_val'].data.mask] = teff_default # Create stellar spectra of each unique temperature sp_dict = {} for tval in np.unique(teff): key = int(tval) sp_dict[key] = stellar_spectrum('G2V', Teff=tval, log_g=4.5, metallicity=0) # Add a flat spectrum for those objects without parallax measurements sp_dict['flat'] = stellar_spectrum('flat') # SW Filters filts_sw = [ 'F070W', 'F090W', 'F115W', 'F140M', 'F150W', 'F150W2', 'F162M', 'F164N', 'F182M', 'F187N', 'F200W', 'F210M', 'F212N' ] # LW Filters filts_lw = [ 'F250M', 'F277W', 'F300M','F322W2', 'F323N', 'F335M', 'F356W', 'F360M', 'F405N', 'F410M', 'F430M', 'F444W', 'F460M', 'F466N', 'F470N', 'F480M' ] filts_all = filts_sw + filts_lw # Read in all bandpasses bp_dict = {} for f in filts_sw: bp_dict[f] = read_filter(f) for f in filts_lw: bp_dict[f] = read_filter(f) # Cycle through all filters and sources to get 0-mag values # Gaia bandpass filter for normalization bp_g = bp_gaia('g', release='DR2') bp_sp_dict = {} for f in tqdm(filts_all, leave=False, desc='Filters'): bp = bp_dict[f] d = {} for k in sp_dict.keys(): sp = sp_dict[k] sp = sp.renorm(0, 'vegamag', bp_g) obs = S.Observation(sp, bp, binset=bp.wave) d[k] = obs.effstim('vegamag') bp_sp_dict[f] = d # Add Teff data to table src_tbl['Teff'] = teff # Add type column (star or galaxy) ind_nopara = gaia_tbl['parallax'].data.mask | (gaia_tbl['parallax'].data.data < 0) src_type = np.array(['galaxy' if v else 'star' for v in ind_nopara]) src_tbl['Type'] = src_type # Generate new columns for each filter for f in filts_all: coldata = [] for i in range(len(gaia_tbl)): # Select spectrum for given object key = 'flat' if src_tbl['Type'][i]=='galaxy' else int(teff[i]) # Offset filter 0-mag value bp_mag = bp_sp_dict[f][key] + gband[i] coldata.append(bp_mag) # Round to the nearest milli-mag coldata = np.round(coldata, decimals=3) # Add filter column to table src_tbl.add_column(coldata, name=f) src_tbl[f].unit = 'mag' return src_tbl
[docs]def make_simbad_source_table(coords, remove_cen_star=True, radius=6*u.arcmin, spt_default='G2V'): from astroquery.simbad import Simbad from astropy.table import Table from .. import stellar_spectrum, read_filter from ..nrc_utils import S, bp_2mass sim_obj = Simbad() sim_obj.reset_votable_fields() sim_obj.remove_votable_fields('coordinates') sim_obj.add_votable_fields('ra(d;ICRS)', 'dec(d;ICRS)', 'flux(K)', 'sp', 'otype(V)', 'distance_result') # coords = targ_dict['HR8799']['sky_coords'] sim_tbl = sim_obj.query_region(coords, radius=radius) # Remove non-stellar sources and ind_star = np.array(['Star' in val for val in sim_tbl['OTYPE_V'].data.data]) ind_nokband = sim_tbl['FLUX_K'].data.mask if remove_cen_star: ind_dist = (sim_tbl['DISTANCE_RESULT'] < 0.1).data # ind_remove = np.where((~ind_star) | ind_nok | ind_dist) ind_remove = np.where(ind_nokband | ind_dist) else: # ind_remove = np.where((~ind_star) | ind_nok) ind_remove = np.where(ind_nokband) sim_tbl.remove_rows(ind_remove) ra_name = sim_tbl.colnames[1] dec_name = sim_tbl.colnames[2] # Create new source table index = np.arange(len(sim_tbl)) + 1 ra = sim_tbl[ra_name] dec = sim_tbl[dec_name] kmag = sim_tbl['FLUX_K'] src_tbl = Table([index, ra, dec, kmag], names=('index', 'ra', 'dec', 'K-Band')) # Add distances src_tbl.add_column(sim_tbl['DISTANCE_RESULT'].data, name='dist') src_tbl['dist'].unit = 'arcsec' # Get effective temperature for each object rounded to the nearest 100K sptype = sim_tbl['SP_TYPE'].data.data # Assume solar temperature for null data sptype[sim_tbl['SP_TYPE'].data.mask] = spt_default sptype[sim_tbl['SP_TYPE'].data==''] = spt_default # Create stellar spectra of each unique temperature sp_dict = {} for spt in np.unique(sptype): if spt=='': continue spt2 = spt.split('+')[0] spt2 = spt2 + 'V' if len(spt2)==2 else spt2 sp_dict[spt2] = stellar_spectrum(spt2) # SW Filters filts_sw = [ 'F070W', 'F090W', 'F115W', 'F140M', 'F150W', 'F150W2', 'F162M', 'F164N', 'F182M', 'F187N', 'F200W', 'F210M', 'F212N' ] # LW Filters filts_lw = [ 'F250M', 'F277W', 'F300M','F322W2', 'F323N', 'F335M', 'F356W', 'F360M', 'F405N', 'F410M', 'F430M', 'F444W', 'F460M', 'F466N', 'F470N', 'F480M' ] filts_all = filts_sw + filts_lw # Read in all bandpasses bp_dict = {} for f in filts_sw: bp_dict[f] = read_filter(f) for f in filts_lw: bp_dict[f] = read_filter(f) # Cycle through all filters and sources to get 0-mag values bp_k = bp_2mass('k') bp_sp_dict = {} for f in tqdm(filts_all, leave=False, desc='Filters'): bp = bp_dict[f] d = {} for k in sp_dict.keys(): sp = sp_dict[k] sp = sp.renorm(0, 'vegamag', bp_k) obs = S.Observation(sp, bp, binset=bp.wave) d[k] = obs.effstim('vegamag') bp_sp_dict[f] = d src_tbl['SpType'] = sptype for row in src_tbl: spt = 'G2V' if row['SpType']=='' else row['SpType'] spt = spt.split('+')[0] spt = spt + 'V' if len(spt)==2 else spt row['SpType'] = spt # Generate new columns for each filter for f in filts_all: bp = bp_dict[f] coldata = [] for row in src_tbl: spt = row['SpType'] # sptype = 'G2V' if row['SP_TYPE']=='' else row['SP_TYPE'] # sptype = sptype.split('+')[0] # sptype = sptype + 'V' if len(sptype)==2 else sptype bp_mag = bp_sp_dict[f][spt] + row['K-Band'] coldata.append(bp_mag) # Round to the nearest milli-mag coldata = np.round(coldata, decimals=3) # Add filter column to table src_tbl.add_column(coldata, name=f) src_tbl[f].unit = 'mag' return src_tbl
[docs]def gen_wfe_drift(obs_input, case='BOL', iec_period=300, slew_init=10, rand_seed=None, t0_offset=False, plot=False, figname=None): """ Create WFE drift information over time Parameters ========== obs_input : DMS_input class Class to generate a series of observation dictionaries in order to build DMS-like files. Loads APT files to generate the necessary observation information. case : string Either "BOL" for current best estimate at beginning of life, or "EOL" for more conservative prediction at end of life. iec_period : float IEC heater switching period in seconds. slew_init : float Assumed slew difference relative to previous program rand_seed : None or int Seed value to initialize random number generator to obtain repeatable values. t0_offset : bool Shift delta WFE drift values to 0 at time t=0 (beginning of program)? If set to False, then relative drift values correspond to beginning of the previous randomly-generated program. plot : bool Create a plot of the slew angles and associated RMS drift components. figname : string Output name (path) to save plot figure. """ import webbpsf from webbpsf_ext.opds import OTE_WFE_Drift_Model from webbpsf.utils import get_webbpsf_data_path from ..opds import opd_default, opd_dir, pupil_file def plot_wfe(figname=None): import matplotlib.pyplot as plt fig, axes = plt.subplots(2,1, figsize=(12,8), sharex=True) axes = axes.flatten() tunit = 'hr' tvals = t_all.to(tunit) ax = axes[0] ax.plot(tvals.value, slew_angles, marker='.') ax.set_ylabel('Pitch Angle (deg)') ax.set_title(f'OPD w/ Initial Slew of 5 deg (PID 1194, {case})') ylims = ax.get_ylim() dy = ylims[1]-ylims[0] for i, texp in enumerate(exp_start): t = (texp * u.s).to(tunit).value label = 'NRC Exps' if i==0 else None ax.plot([t,t], [ylims[0],ylims[0]+0.25*dy] , color='k', ls=':', lw=1, alpha=0.5, label=label) for i, tvisit in enumerate(visit_start): t = (tvisit * u.s).to(tunit).value label = 'Visit Start' if i==0 else None ax.plot([t,t], [ylims[0],ylims[0]+0.5*dy] , color='C2', ls='--', lw=1.5, label=label) for i, tslew in enumerate(slew_start): t = (tslew * u.s).to(tunit).value label = 'Slew Start' if i==0 else None ax.plot([t,t], [ylims[0],ylims[0]+0.75*dy] , color='C1', ls='--', lw=2, label=label) ax.set_ylim(ylims) ax.legend() # Plot WFE drift components # Offset relative to first visit in sequence wfe_dict2 = wfe_dict.copy() keys = list(wfe_dict.keys())#[0:2] for k in ['frill', 'thermal']: wfe_val = wfe_dict[k] trel = (visit_start[0]*u.s).to(tunit).value wfe_dict2[k] = wfe_val - np.interp(trel, tvals.value, wfe_val) # Update total RMS wfe_dict2['total'] = np.sqrt(wfe_dict2['frill']**2 + wfe_dict2['thermal']**2 + wfe_dict2['iec']**2) ax = axes[1] for k in keys: lw = 2 if 'total' in k else 1 alpha = 0.5 if 'total' in k else 1 ax.plot(tvals.value, wfe_dict2[k], label=k, lw=lw, alpha=alpha) ax.set_xlabel(f'Time ({tunit})') for ax in axes[1:]: ax.legend() ax.set_ylabel('$\Delta$WFE (nm RMS)') xlim = [-1,18.5] ax.set_xlim(xlim) ylim = ax.get_ylim() if np.abs(ylim[0])>ylim[1]: ylim = np.array([-1,1]) * np.max(np.abs(ylim)) ax.set_ylim(ylim) fig.tight_layout() if figname is not None: fig.savefig(figname) print(f'Saveing: {figname}') # Get total time for program temp, _, _, _ = obs_input.gen_pitch_array(nvals=1000) total_time = temp.max() # Create a series of time values to evolve over dt = iec_period / 2 # Timing sample tarr = np.arange(0, total_time, dt) # seconds # Required size nvals = len(tarr) # Create an initial fake scenario to occur before ours res = obs_input.gen_pitch_array(nvals=nvals, pitch_init=None) tprior, pitch_prior, _, _ = res tprior = tprior[::5] - tprior[-1] pitch_prior = pitch_prior[::5][::-1] - slew_init pitch_prior[0] -= slew_init # Create desired observations res = obs_input.gen_pitch_array(nvals=nvals, pitch_init=pitch_prior[-1]) tarr, pitch_arr, slew_start, visit_start = res # Change anything that's outside of pitch bounds pitch_prior[pitch_prior<-5] = -5 pitch_prior[pitch_prior>45] = 45 pitch_arr[pitch_arr<-5] = -5 pitch_arr[pitch_arr>45] = 45 # All NIRCam exposure start times exp_start = [] for k in obs_input.program_info.keys(): d = obs_input.program_info[k] exp_start.append(d['exp_start_times']) exp_start = np.concatenate(exp_start) # Data directories # OPD directory defined above webbpsf_path = get_webbpsf_data_path() pupil_dir = webbpsf_path # Pupil and OPD file path names opd_file, opd_index = opd_default pupil_path = os.path.join(pupil_dir, pupil_file) opd_path = os.path.join(opd_dir, opd_file) # Initiate OTE drift class name = "Modified OPD from " + str(opd_file) ote = OTE_WFE_Drift_Model(name=name, opd=opd_path, opd_index=opd_index, transmission=pupil_path) # Generate delta OPDs for each time step # Also outputs a dictionary of each component's WFE drift value (nm RMS) t_all = np.concatenate([tprior, tarr]) * u.s slew_angles = np.concatenate([pitch_prior, pitch_arr]) wfe_dict_all = ote.evolve_dopd(t_all, slew_angles, case=case, return_dopd_fin=False, random_seed=rand_seed) tunit = 'hr' tvals = t_all.to(tunit) # Offset relative to first visit in sequence wfe_dict = wfe_dict_all.copy() if t0_offset: for k in ['frill', 'thermal']: wfe_val = wfe_dict[k] trel = (visit_start[0]*u.s).to(tunit).value wfe_dict[k] = wfe_val - np.interp(trel, tvals.value, wfe_val) # Update total RMS wfe_dict['total'] = np.sqrt(wfe_dict['frill']**2 + wfe_dict['thermal']**2 + wfe_dict['iec']**2) wfe_dict['time_sec'] = tvals.to('s').value if plot: plot_wfe(figname=figname) return wfe_dict
[docs]def add_ipc(im, alpha_min=0.0065, alpha_max=None, kernel=None): """Convolve image with IPC kernel Given an image in electrons, apply IPC convolution. NIRCam average IPC values (alpha) reported 0.005 - 0.006. Parameters ========== im : ndarray Input image or array of images. alpha_min : float Minimum coupling coefficient between neighboring pixels. If alpha_max is None, then this is taken to be constant with respect to signal levels. alpha_max : float or None Maximum value of coupling coefficent. If specificed, then coupling between pixel pairs is assumed to vary depending on signal values. See Donlon et al., 2019, PASP 130. kernel : ndarry or None Option to directly specify the convolution kernel. `alpha_min` and `alpha_max` are ignored. Examples ======== Constant Kernel >>> im_ipc = add_ipc(im, alpha_min=0.0065) Constant Kernel (manual) >>> alpha = 0.0065 >>> k = np.array([[0,alpha,0], [alpha,1-4*alpha,alpha], [0,alpha,0]]) >>> im_ipc = add_ipc(im, kernel=k) Signal-dependent Kernel >>> im_ipc = add_ipc(im, alpha_min=0.0065, alpha_max=0.0145) """ sh = im.shape ndim = len(sh) if ndim==2: im = im.reshape([1,sh[0],sh[1]]) sh = im.shape if kernel is None: xp = yp = 1 else: yp, xp = np.array(kernel.shape) / 2 yp, xp = int(yp), int(xp) # Pad images to have a pixel border of zeros im_pad = np.pad(im, ((0,0), (yp,yp), (xp,xp)), 'symmetric') # Check for custom kernel (overrides alpha values) if (kernel is not None) or (alpha_max is None): # Reshape to stack all images along horizontal axes im_reshape = im_pad.reshape([-1, im_pad.shape[-1]]) if kernel is None: kernel = np.array([[0.0, alpha_min, 0.0], [alpha_min, 1.-4*alpha_min, alpha_min], [0.0, alpha_min, 0.0]]) # Convolve IPC kernel with images im_ipc = convolve(im_reshape, kernel).reshape(im_pad.shape) # Exponential coupling strength # Equation 7 of Donlon et al. (2018) else: arrsqr = im_pad**2 amin = alpha_min amax = alpha_max ascl = (amax-amin) / 2 alpha_arr = [] for ax in [1,2]: # Shift by -1 diff = np.abs(im_pad - np.roll(im_pad, -1, axis=ax)) sumsqr = arrsqr + np.roll(arrsqr, -1, axis=ax) imtemp = amin + ascl * np.exp(-diff/20000) + \ ascl * np.exp(-np.sqrt(sumsqr / 2) / 10000) alpha_arr.append(imtemp) # Take advantage of symmetries to shift in other direction alpha_arr.append(np.roll(imtemp, 1, axis=ax)) alpha_arr = np.array(alpha_arr) # Flux remaining in parent pixel im_ipc = im_pad * (1 - alpha_arr.sum(axis=0)) # Flux shifted to adjoining pixels for i, (shft, ax) in enumerate(zip([-1,+1,-1,+1], [1,1,2,2])): im_ipc += alpha_arr[i]*np.roll(im_pad, shft, axis=ax) del alpha_arr # Trim excess im_ipc = im_ipc[:,yp:-yp,xp:-xp] if ndim==2: im_ipc = im_ipc.squeeze() return im_ipc
[docs]def add_ppc(im, ppc_frac=0.002, nchans=4, kernel=None, same_scan_direction=False, reverse_scan_direction=False, in_place=False): """ Add Post-Pixel Coupling (PPC) This effect is due to the incomplete settling of the analog signal when the ADC sample-and-hold pulse occurs. The measured signals for a given pixel will have a value that has not fully transitioned to the real analog signal. Mathematically, this can be treated in the same way as IPC, but with a different convolution kernel. Parameters ========== im : ndarray Image or array of images ppc_frac : float Fraction of signal contaminating next pixel in readout. kernel : ndarry or None Option to directly specify the convolution kernel, in which case `ppc_frac` is ignored. nchans : int Number of readout output channel amplifiers. same_scan_direction : bool Are all the output channels read in the same direction? By default fast-scan readout direction is ``[-->,<--,-->,<--]`` If ``same_scan_direction``, then all ``-->`` reverse_scan_direction : bool If ``reverse_scan_direction``, then ``[<--,-->,<--,-->]`` or all ``<--`` in_place : bool Apply in place to input image. """ sh = im.shape ndim = len(sh) if ndim==2: im = im.reshape([1,sh[0],sh[1]]) sh = im.shape nz, ny, nx = im.shape chsize = nx // nchans # Do each channel separately if kernel is None: kernel = np.array([[0.0, 0.0, 0.0], [0.0, 1.0-ppc_frac, ppc_frac], [0.0, 0.0, 0.0]]) res = im if in_place else im.copy() for ch in np.arange(nchans): if same_scan_direction: k = kernel[:,::-1] if reverse_scan_direction else kernel elif np.mod(ch,2)==0: k = kernel[:,::-1] if reverse_scan_direction else kernel else: k = kernel if reverse_scan_direction else kernel[:,::-1] x1 = chsize*ch x2 = x1 + chsize res[:,:,x1:x2] = add_ipc(im[:,:,x1:x2], kernel=k) if ndim==2: res = res.squeeze() return res
[docs]def gen_col_noise(ramp_column_varations, prob_bad, nz=108, nx=2048, rand_seed=None): """ Generate RTN Column Noise This function takes the random telegraph noise templates derived from CV3 data and generates a random noise set to add to an dark ramp sim. These column variations likely come from RTN in column-specifc FETs jumping between two discrete states, possibly within the detector column bus. This function randomly draws a number of template column variation ramps, then randomly assigns them to different columns in `super_dark_ramp`. The nubmer of columns (and whether or not a column is assigned a random variation) is based on the `prob_bad` variable. """ rng = np.random.default_rng(rand_seed) # Number of samples in ramp templates nz0 = ramp_column_varations.shape[0] # if nz>nz0: # raise ValueError('nz should not be greater than {} frames'.format(nz0)) # Variable to store column offsets for all NX columns cols_all_add = np.zeros([nz0,nx]) # Mask of random columns to include ramp excursions # Create set of random values between 0 and 1 # Mark those with values less than prob_bad for # adding some random empirically measured column xmask_random = rng.random(size=nx) <= prob_bad nbad_random = len(xmask_random[xmask_random]) # Grab some random columns from the stored templates ntemplates = ramp_column_varations.shape[1] ind_rand = rng.integers(0, high=ntemplates, size=ntemplates) # Make sure we get unique values (no repeats) _, ind_rand = np.unique(ind_rand, return_index=True) ind_rand = ind_rand[0:nbad_random] # If we don't have enough random columns, append more # This should be very unlikely to occur, but just in case... if len(ind_rand) < nbad_random: ndiff = nbad_random - len(ind_rand) ind_rand = np.append(ind_rand, rng.integers(0, high=ntemplates, size=ndiff)) # Select the set of random column variation templates cols_rand = ramp_column_varations[:,ind_rand] # Add a random phase shift to each of those template column tshifts = rng.integers(0, high=nz0, size=nbad_random) for i in range(nbad_random): cols_rand[:,i] = np.roll(cols_rand[:,i], tshifts[i]) # Add to columns variable cols_all_add[:, xmask_random] = cols_rand # Reshape to (nz0,1,nx) to easily add to a ramp of size (nz,ny,nx) cols_all_add = cols_all_add.reshape([nz0,1,-1]) # Only return number of requested frames if nz>nz0: cols_all_add2 = np.zeros([nz,1,nx]) cols_all_add2[0:nz0] = cols_all_add return cols_all_add2 else: return cols_all_add[0:nz, :, :]
[docs]def add_col_noise(data_in, ramp_column_varations, prob_bad, rand_seed=None): """ Add RTN Column Noise This function takes the random telegraph noise templates derived from CV3 data and adds it to an idealized dark ramp. These column variations likely come from noise in column-specifc FETs jumping between two discrete states, possibly within the detector column bus. This function randomly draws a number of template column variation ramps, then randomly assigns them to different columns in `super_dark_ramp`. The nubmer of columns (and whether or not a column is assigned a random variation) is based on the `prob_bad` variable. Parameters ========== data_in : ndarray Idealized ramp of size (nz,ny,nx) ramp_column_variations : ndarray The column-average ramp variations of size (nz,nx). These are added to a given columnn. prob_bad : float Probability that a given column is subject to these column variations. """ nz, ny, nx = data_in.shape cols_all_add = gen_col_noise(ramp_column_varations, prob_bad, nz=nz, nx=nx, rand_seed=rand_seed) # Add to dark ramp return data_in + cols_all_add
[docs]def gen_ramp_biases(ref_dict, nchan=None, data_shape=(2,2048,2048), include_refinst=True, ref_border=[4,4,4,4], rand_seed=None): """ Generate a ramp of bias offsets Parameters ========== ref_dict : dict Dictionary of reference behaviors. nchan : int Specify number of output channels. If not set, then will automatically determine from `ref_dict`. This allows us to set nchan=1 for Window Mode while using the first channel info provided in `ref_dict`. data_shape : array like Shape of output (nz,ny,nx) include_refinst : bool Include instabilities in the offsets? ref_border: list Number of references pixels [lower, upper, left, right] """ rng = np.random.default_rng(rand_seed) if nchan is None: nchan = len(ref_dict['amp_offset_mean']) cube = np.zeros(data_shape) nz, ny, nx = data_shape chsize = int(nx/nchan) ###################### # Add overall bias # TODO: Add temperature dependence bias_off = ref_dict['master_bias_mean'] + rng.normal(scale=ref_dict['master_bias_std']) cube += bias_off # Add amplifier offsets # These correlate to bias offset cf = ref_dict['master_amp_cf'] amp_off = jl_poly(bias_off, cf) + rng.normal(scale=ref_dict['amp_offset_std']) for ch in range(nchan): cube[:,:,ch*chsize:(ch+1)*chsize] += amp_off[ch] # Include frame-to-frame bias variation ###################### bias_off_f2f = rng.normal(scale=ref_dict['master_bias_f2f'], size=nz) amp_off_f2f = rng.normal(scale=ref_dict['amp_offset_f2f'][0:nchan], size=(nz,nchan)) for i, im in enumerate(cube): im += bias_off_f2f[i] for ch in range(nchan): im[:,ch*chsize:(ch+1)*chsize] += amp_off_f2f[i,ch] # Add some reference pixel instability relative to active pixels ###################### # Mask of all reference pixels in detector coordiantes # Active and reference pixel masks lower, upper, left, right = ref_border mask_ref = np.zeros([ny,nx], dtype='bool') if lower>0: mask_ref[0:lower,:] = True if upper>0: mask_ref[-upper:,:] = True if left>0: mask_ref[:,0:left] = True if right>0: mask_ref[:,-right:] = True rseed_nchan = rng.integers(0, 2**32-1, size=nchan) if include_refinst: for ch in range(nchan): mask_ch = np.zeros([ny,nx]).astype('bool') mask_ch[:,ch*chsize:(ch+1)*chsize] = True std = ref_dict['amp_ref_inst_f2f'][ch] ref_noise = std * pink_noise(nz, rand_seed=rseed_nchan[ch]) cube[:, mask_ref & mask_ch] += ref_noise.reshape([-1,1]) # Set even/odd offsets ###################### mask_even = np.zeros([ny,nx]).astype('bool') mask_even[:,0::2] = True mask_odd = np.zeros([ny,nx]).astype('bool') mask_odd[:,1::2] = True for ch in range(nchan): mask_ch = np.zeros([ny,nx]).astype('bool') mask_ch[:,ch*chsize:(ch+1)*chsize] = True cube[:, mask_even & mask_ch] += ref_dict['amp_even_col_offset'][ch] cube[:, mask_odd & mask_ch] += ref_dict['amp_odd_col_offset'][ch] return cube
[docs]def fft_noise(pow_spec, nstep_out=None, fmin=None, f=None, pad_mode='edge', rand_seed=None, **kwargs): """ Random Noise from Power Spectrum Returns a noised array where the instrinsic distribution follows that of the input power spectrum. The output has an instrinsic standard deviation scaled to 1.0. Parameters ========== pow_spec : ndarray Input power spectrum from which to generate noise distribution. nstep_out : int Desired size of the output noise array. If smaller than `pow_spec` then it just truncates the results to the appropriate size. If larger, then pow_spec gets padded by the specified `pad_mode`. fmin : float or None Low-frequency cutoff. Power spectrum values below this cut-off point get set equal to the power spectrum value at fmin. f : ndarray or None An array the same size as pow_spec and is only used when fmin is set. If set to None, then `f = np.fft.rfftfreq(n_ifft)` where `n_ifft` is the size of the result of `rifft(pow_spec)` assuming a delta time of unity. pad_mode : str or function One of the following string values or a user supplied function. Default is 'edge'. 'constant' (default) Pads with a constant value. 'edge' Pads with the edge values of array. 'linear_ramp' Pads with the linear ramp between end_value and the array edge value. 'maximum' Pads with the maximum value of all or part of the vector along each axis. 'mean' Pads with the mean value of all or part of the vector along each axis. 'median' Pads with the median value of all or part of the vector along each axis. 'minimum' Pads with the minimum value of all or part of the vector along each axis. 'reflect' Pads with the reflection of the vector mirrored on the first and last values of the vector along each axis. 'symmetric' Pads with the reflection of the vector mirrored along the edge of the array. 'wrap' Pads with the wrap of the vector along the axis. The first values are used to pad the end and the end values are used to pad the beginning. """ rng = np.random.default_rng(rand_seed) if nstep_out is None: nstep_out = 2 * (len(pow_spec) - 1) nstep = nstep_out nstep2 = int(2**np.ceil(np.log2(nstep-1))) + 1 lin_spec = np.sqrt(pow_spec) # Set cuf-off frequency if (fmin is not None) and (fmin>0): n_ifft = 2 * (len(lin_spec) - 1) f = np.fft.rfftfreq(n_ifft) if f is None else f fstep = f[1] - f[0] fmin = np.max([fmin, fstep]) ix = np.sum(f < fmin) # Index of the cutoff if ix > 1 and ix < len(f): lin_spec[:ix] = lin_spec[ix] # Padding to add lower frequencies pad = nstep2-len(lin_spec) pad = 0 if pad <0 else pad if pad>0: lin_spec = np.pad(lin_spec, (pad,0), mode=pad_mode, **kwargs) # Build scaling factors for all frequencies # Calculate theoretical output standard deviation from scaling w = lin_spec[1:-1] n_ifft = 2 * (len(lin_spec) - 1) w_last = lin_spec[-1] * (1 + (n_ifft % 2)) / 2. # correct f = +-0.5 the_std = 2 * np.sqrt(np.sum(w**2) + w_last**2) / n_ifft # Generate scaled random power + phase # sr = lin_spec # sr = rng.normal(scale=lin_spec) # si = rng.normal(scale=lin_spec) # For large numbers, faster to gen with scale=1, then multiply sr = rng.normal(size=len(lin_spec)) * lin_spec si = rng.normal(size=len(lin_spec)) * lin_spec # If the signal length is even, frequencies +/- 0.5 are equal # so the coefficient must be real. if (nstep2 % 2) == 0: si[-1] = 0 # Regardless of signal length, the DC component must be real si[0] = 0 # Combine power + corrected phase to Fourier components thefft = sr + 1J * si # Apply the pinkening filter. result = np.fft.irfft(thefft) # Keep requested nstep and scale to unit variance result = result[:nstep_out] / the_std return result
[docs]def pink_noise(nstep_out, pow_spec=None, f=None, fmin=None, alpha=-1, **kwargs): """ Generate random pink noise Parameters ========== nstep_out : int Desired size of the output noise array. If smaller than `pow_spec` then it just truncates the results to the appropriate size. If larger, then pow_spec gets padded by the specified `pad_mode`. pow_spec : ndarray Option to input the power spectrum instead of regenerating it every time. Make sure this was generated with powers of 2 for faster processing. f : ndarray or None An array the same size as pow_spec. If set to None, then will create an array of appropriate size assuming a delta time of unity. fmin : float or None Low-frequency cutoff. Power spectrum values below this cut-off point get set equal to the power spectrum value at fmin. alpha : float Power spectrum index to generate if `pow_spec` is not specified directly. Keyword Args ============ pad_mode : str or function One of the following string values or a user supplied function. Default is 'edge'. 'constant' (default) Pads with a constant value. 'edge' Pads with the edge values of array. 'linear_ramp' Pads with the linear ramp between end_value and the array edge value. 'maximum' Pads with the maximum value of all or part of the vector along each axis. 'mean' Pads with the mean value of all or part of the vector along each axis. 'median' Pads with the median value of all or part of the vector along each axis. 'minimum' Pads with the minimum value of all or part of the vector along each axis. 'reflect' Pads with the reflection of the vector mirrored on the first and last values of the vector along each axis. 'symmetric' Pads with the reflection of the vector mirrored along the edge of the array. 'wrap' Pads with the wrap of the vector along the axis. The first values are used to pad the end and the end values are used to pad the beginning. rand_seed : None or int Seed value to initialize random number generator to obtain repeatable values. """ if ((fmin is not None) and (fmin>0)) or (pow_spec is None): # Set up to a power of 2 for faster processing nstep2 = 2 * int(2**np.ceil(np.log2(nstep_out))) f = np.fft.rfftfreq(nstep2) f[0] = f[1] # First element should not be 0 if pow_spec is None: pow_spec = f**alpha pow_spec[0] = 0. if f is not None: assert len(f)==len(pow_spec), "f and pow_spec must be same size" assert len(pow_spec)>=nstep_out, "Power spectrum must be greater than nstep_out" res = fft_noise(pow_spec, nstep_out=nstep_out, fmin=fmin, f=f, **kwargs) return res
[docs]def sim_noise_data(det, rd_noise=[5,5,5,5], u_pink=[1,1,1,1], c_pink=3, acn=0, pow_spec_corr=None, corr_scales=None, fcorr_lim=[1,10], ref_ratio=0.8, rand_seed=None, verbose=True, **kwargs): """ Simulate Noise Ramp Simulate the noise components of a ramp, including white noise as well as 1/f (pink) noise components that are uncorrelated and correlated between amplifier channels. Parameters ========== det : `det_timing` class Class holding detector operations information. See `detops.det_timing` for generic class, or `pynrc_core.DetectorOps` for NIRCam specific timing. rd_noise : array like or float or None Array of white noise values (std dev per frame) for each output channel, or a single value. If an array, must match the number amplifier values specified in `det.nchan`. u_pink : array like or float or None Array of uncorrelated pink noise (std dev per frame) for each output channel, or a single value. If an array, must match the number amplifier values specified in `det.nchan`. c_pink : float or None Standard deviation of the pink noise correlated between channels. pow_spec_corr : ndarray Option to input a custom power spectrum for the correlated noise. corr_scales : array like Instead of `pow_spec_corr`, input the scale factors of the two 1/f components [low freq, highfreq]). fcorr_lim : array like Low- and high- frequency cut-off points for `corr_scales` factors. The first element of `corr_scales` is applied to those frequencies below `fcorr_lim[0]`, while the second element corresponds to frequencies above `fcorr_lim[1]`. """ from pynrc.reduce.calib import fit_corr_powspec, broken_pink_powspec import time ################################ # Initialize a random number generator rng = np.random.default_rng(rand_seed) # Create individual random number generators for each noise element # (eg., read noise, uncorrelated pink noise, correlated pink noise) # This allows for repeatable noise values for each element as long as # the same rand_seed is passed, even if others elemented are excluded # on different runs. # If rand_seed=None, then everthing here will be random. rng_keys = ['rd_noise', 'u_pink', 'c_pink', 'acn'] rng_dict = {} for k in rng_keys: # Create a random seed to pass to individual RNGs rseed = rng.integers(0, 2**32-1) rng_dict[k] = np.random.default_rng(rseed) nchan = det.nout nx = det.xpix ny = det.ypix chsize = det.chsize # Number of total frames up the ramp (including drops) ma = det.multiaccum nd1 = ma.nd1 nd2 = ma.nd2 nf = ma.nf ngroup = ma.ngroup nz = nd1 + ngroup*nf + (ngroup-1)*nd2 nroh = det._line_overhead nfoh = det._extra_lines same_scan_direction = det.same_scan_direction reverse_scan_direction = det.reverse_scan_direction result = np.zeros([nz,ny,nx]) # Make white read noise. This is the same for all pixels. if rd_noise is not None: rng = rng_dict['rd_noise'] # We want rd_noise to be an array or list if isinstance(rd_noise, (np.ndarray,list)): temp = np.asarray(rd_noise) if temp.size != nchan: _log.error('Number of elements in rd_noise not equal to n_out') return else: # Single value as opposed to an array or list rd_noise = np.ones(nchan) * rd_noise w = det.ref_info rr = ref_ratio #reference_pixel_noise_ratio if np.any(rd_noise): if verbose: _log.info('Generating read noise...') # Go frame-by-frame for z in np.arange(nz): here = np.zeros((ny,nx)) # First assume no ref pixels and just add in random noise for ch in np.arange(nchan): x1 = ch * chsize x2 = x1 + chsize here[:,x1:x2] = rng.normal(scale=rd_noise[ch], size=(ny,chsize)) # If there are reference pixels, overwrite with appropriate noise values # Noisy reference pixels for each side of detector rd_ref = rr * np.mean(rd_noise) if w[0] > 0: # lower here[:w[0],:] = rng.normal(scale=rd_ref, size=(w[0],nx)) if w[1] > 0: # upper here[-w[1]:,:] = rng.normal(scale=rd_ref, size=(w[1],nx)) if w[2] > 0: # left here[:,:w[2]] = rng.normal(scale=rd_ref, size=(ny,w[2])) if w[3] > 0: # right here[:,-w[3]:] = rng.normal(scale=rd_ref, size=(ny,w[3])) # Add the noise in to the result result[z,:,:] += here # Finish if no 1/f noise specified if (c_pink is None) and (u_pink is None) and (acn is None): return result ################################# # 1/f noise ch_poh = chsize + nroh ny_poh = ny + nfoh # Compute the number of time steps per integration, per output nstep_frame = ch_poh * ny_poh nstep = nstep_frame * nz # Pad nsteps to a power of 2, which is much faster nstep2 = int(2**np.ceil(np.log2(nstep))) f2 = np.fft.rfftfreq(2*nstep2) f2[0] = f2[1] # First element should not be 0 alpha = -1 p_filter2 = np.sqrt(f2**alpha) p_filter2[0] = 0. # Add correlated pink noise. if (c_pink is not None) and (c_pink > 0): rng = rng_dict['c_pink'] if verbose: _log.info('Adding correlated pink noise...') if corr_scales is not None: scales = np.array(corr_scales) fcut1, fcut2 = np.array(fcorr_lim) / det._pixel_rate pf = broken_pink_powspec(f2, scales, fcut1=fcut1, fcut2=fcut2, alpha=alpha) pf[0] = 0 elif pow_spec_corr is not None: n_ifft = 2 * (len(pow_spec_corr) - 1) freq_corr = np.fft.rfftfreq(n_ifft, d=1/det._pixel_rate) freq_corr[0] = freq_corr[1] # Fit power spectrum and remake for f2 scales = fit_corr_powspec(freq_corr, pow_spec_corr, **kwargs) fcut1, fcut2 = np.array(fcorr_lim) / det._pixel_rate pf = broken_pink_powspec(f2, scales, fcut1=fcut1, fcut2=fcut2, alpha=alpha) pf[0] = 0 else: pf = p_filter2 # Pass through a random seed to pink_noise function rseed = rng.integers(0, 2**32-1) tt = c_pink * pink_noise(nstep, pow_spec=pf, rand_seed=rseed) tt = tt.reshape([nz, ny_poh, ch_poh])[:,0:ny,0:chsize] # print(' Corr Pink Noise (input, output): {:.2f}, {:.2f}' # .format(c_pink, np.std(tt))) for ch in np.arange(nchan): x1 = ch*chsize x2 = x1 + chsize if (same_scan_direction) or (np.mod(ch,2)==0): flip = True if reverse_scan_direction else False else: flip = False if reverse_scan_direction else True if flip: result[:,:,x1:x2] += tt[:,:,::-1] else: result[:,:,x1:x2] += tt del tt # Add uncorrelated pink noise. Because this pink noise is stationary and # different for each output, we don't need to flip it (but why not?) if u_pink is not None: rng = rng_dict['u_pink'] # We want u_pink to be an array or list if isinstance(u_pink, (np.ndarray,list)): temp = np.asarray(u_pink) if temp.size != nchan: _log.error('Number of elements in u_pink not equal to n_out') return else: # Single value as opposed to an array or list u_pink = np.ones(nchan) * u_pink # Only do the rest if any values are not 0 if np.any(u_pink): if verbose: _log.info('Adding uncorrelated pink noise...') for ch in trange(nchan, desc='Uncorr 1/f', leave=False): x1 = ch*chsize x2 = x1 + chsize # Pass through a random seed to pink_noise function rseed = rng.integers(0, 2**32-1) tt = u_pink[ch] * pink_noise(nstep, pow_spec=p_filter2, rand_seed=rseed) tt = tt.reshape([nz, ny_poh, ch_poh])[:,0:ny,0:chsize] # print(' Ch{} Pink Noise (input, output): {:.2f}, {:.2f}' # .format(ch, u_pink[ch], np.std(tt))) if (same_scan_direction) or (np.mod(ch,2)==0): flip = True if reverse_scan_direction else False else: flip = False if reverse_scan_direction else True if flip: result[:,:,x1:x2] += tt[:,:,::-1] else: result[:,:,x1:x2] += tt del tt # Add ACN if (acn is not None) and (acn>0): rng = rng_dict['acn'] if verbose: _log.info('Adding ACN noise...') facn = np.fft.rfftfreq(nstep2) facn[0] = facn[1] # First element should not be 0 alpha = -2 pf_acn = np.sqrt(facn**alpha) pf_acn[0] = 0. for ch in trange(nchan, desc='ACN', leave=False): x1 = ch*chsize x2 = x1 + chsize # Generate new pink noise for each even and odd vector. rseed_a, rseed_b = rng.integers(0, 2**32-1, size=2) a = acn * pink_noise(int(nstep/2), pow_spec=pf_acn, rand_seed=rseed_a) b = acn * pink_noise(int(nstep/2), pow_spec=pf_acn, rand_seed=rseed_b) # print(' Ch{} ACN Noise (input, [outa, outb]): {:.2f}, [{:.2f}, {:.2f}]' # .format(ch, acn, np.std(a), np.std(b))) # Reformat into an image. tt = np.reshape(np.transpose(np.vstack((a, b))), (nz, ny_poh, ch_poh))[:, 0:ny, 0:chsize] if (same_scan_direction) or (np.mod(ch,2)==0): flip = True if reverse_scan_direction else False else: flip = False if reverse_scan_direction else True if flip: result[:,:,x1:x2] += tt[:,:,::-1] else: result[:,:,x1:x2] += tt del tt return result
[docs]def gen_dark_ramp(dark, out_shape, tf=10.73677, gain=1, ref_info=None, avg_ramp=None, include_poisson=True, rand_seed=None, **kwargs): """ Assumes a constant dark current rate, either in image form or single value. If gain is supplied, then input is assumed to be in DN/sec, otherwise e-/sec. Output will be e-. Parameters ---------- dark : ndarray or float Dark slope image or constant value. Assumed to be DN/sec. If gain=1, then also e-/sec. If this value is intended to be e-/sec, then simply set gain=1. out_shape : tuple, list, ndarray Desired shape of output ramp (nframes, ny, nx). If `dark` is an array, then dark.shape == out_shape[1:] == (ny,nx). tf : float Frame time in seconds gain : float Gain of detector in e-/sec. If specified to be other than 1, then we assume `dark` to be in units of DN/sec. avg_ramp : ndarray Time-dependent flux of average dark ramp. """ rng = np.random.default_rng(rand_seed) nz, ny, nx = out_shape if avg_ramp is not None: assert len(avg_ramp)>=nz, "avg_ramp size must be >= to number of requested frames (out_shape[0])" # Count accumulation for a single frame (e-) dark_frame = np.ones([ny,nx]) * dark * tf * gain # Set negative values to median med = np.median(dark_frame) med = 0 if med<0 else med dark_frame[dark_frame<0] = med # Return an array of 0s if all dark current is 0 if np.all(dark_frame==0): result = np.zeros(out_shape) else: # Add Poisson noise at each frame step if include_poisson: result = rng.poisson(lam=dark_frame, size=out_shape).astype('float') else: result = np.array([dark_frame for i in range(nz)]) # Perform cumulative sum in place result = np.cumsum(result, axis=0, out=result) # Modulate "ideal" slope by emperical "average ramp" behavior if avg_ramp is not None: tarr = np.arange(1,nz+1)*tf avg_dark = np.median(dark) del_ramp = avg_ramp[0:nz] - avg_dark*tarr # DN result += gain * del_ramp.reshape([-1,1,1]) # e- # Set reference pixels' dark current equal to 0 if ref_info is not None: w = ref_info if w[0] > 0: # lower result[:,:w[0],:] = 0 if w[1] > 0: # upper result[:,-w[1]:,:] = 0 if w[2] > 0: # left result[:,:,:w[2]] = 0 if w[3] > 0: # right result[:,:,-w[3]:] = 0 # Return in units of e- return result
[docs]def sim_dark_ramp(det, slope_image, ramp_avg_ch=None, ramp_avg_tf=10.73677, out_ADU=False, verbose=False, **kwargs): """ Simulate a dark current ramp based on input det class and a super dark image. By default, returns ramp in terms of e- using gain information provide in `det` input. To return in terms of ADU, set `out_ADU=True` (divides by gain). Parameters ---------- det : Detector Class Desired detector class output slope_image : ndarray Input slope image (DN/sec). Can either be full frame or match `det` subarray. Returns `det` subarray shape. Keyword Args ------------ ramp_avg_ch : ndarray or None Time-dependent flux of average dark ramp for each amplifier channel for dark current simulations. ramp_avg_tf : float Delta time between between `ramp_avg_ch` points. out_ADU : bool Divide by gain to get value in ADU (float). include_poisson : bool Include Poisson noise from photons? verbose : bool Print some messages. """ nchan = det.nout nx, ny = (det.xpix, det.ypix) chsize = det.chsize tf = det.time_frame gain = det.gain ref_info = det.ref_info # Do we need to crop out subarray? if slope_image.shape[0]==ny: y1, y2 = (0, ny) else: # Will crop a subarray out of slope_image y1 = det.y0 y2 = int(y1 + ny) # Number of total frames up the ramp (including drops) ma = det.multiaccum nd1 = ma.nd1 nd2 = ma.nd2 nf = ma.nf ngroup = ma.ngroup nz = nd1 + ngroup*nf + (ngroup-1)*nd2 # Interpolate ramp_avg_ch onto tarr grid if (ramp_avg_ch is not None): tarr = np.arange(1,nz+1) * tf if tarr.max() < ramp_avg_tf: if verbose: msg = "Max ramp time {:.1f} is less than ramp_avg_tf. \ Not applying ramp_avg_ch.".format(tarr.max()) _log.warn(msg) ramp_avg_ch = None else: # Insert 0 DN at t=0 tvals = np.arange(0,ramp_avg_ch.shape[1]) * ramp_avg_tf ramp_avg_ch = np.insert(ramp_avg_ch, 0,0, axis=0) # Interpolate onto new time grid ramp_avg_ch_new = [] for ramp_avg in ramp_avg_ch: avg_interp = np.interp(tarr, tvals, ramp_avg) ramp_avg_ch_new.append(avg_interp) ramp_avg_ch = np.array(ramp_avg_ch_new) if verbose: _log.info('Generating dark current ramp...') res = np.zeros([nz,ny,nx]) for ch in np.arange(nchan): if nchan==1: # Subarray window case if slope_image.shape[1]==nx: x1, x2 = (0, nx) else: # Will crop a subarray out of slope_image x1 = det.x0 x2 = int(x1 + nx) else: # STRIPE or FULL frame x1 = ch*chsize x2 = x1 + chsize slope_sub = slope_image[y1:y2,x1:x2] avg_ramp = None if ramp_avg_ch is None else ramp_avg_ch[ch] # Convert from DN to e- res[:,:,x1:x2] = gen_dark_ramp(slope_sub, (nz,ny,chsize), gain=gain, tf=tf, avg_ramp=avg_ramp, ref_info=None, **kwargs) if out_ADU: res /= gain # Set reference pixels' dark current equal to 0 if ref_info is not None: w = ref_info if w[0] > 0: # lower res[:,:w[0],:] = 0 if w[1] > 0: # upper res[:,-w[1]:,:] = 0 if w[2] > 0: # left res[:,:,:w[2]] = 0 if w[3] > 0: # right res[:,:,-w[3]:] = 0 return res
[docs]def sim_image_ramp(det, im_slope, verbose=False, **kwargs): """ Simulate an image ramp based on input det class and slope image. Uses the `sim_dark_ramp` function. By default, returns ramp in terms of e- using gain information provide in `det` input. To return in terms of ADU, set `out_ADU=True` (divides by gain). Parameters ---------- det : Detector Class Desired detector class output im_slope : ndarray Input slope image (e-/sec). *NOTE* - This is different than sim_dark_ramp, which assumed DN/sec. Can either be full frame or match `det` subarray. Returns `det` subarray shape. include_poisson : bool Include Poisson noise from photons? Default: True. Keyword Args ------------ out_ADU : bool Divides by gain to get output value in ADU (float). verbose : bool Print some messages. """ if verbose: _log.info('Generating image acquisition ramp...') # Convert to DN/sec return sim_dark_ramp(det, im_slope/det.gain, ramp_avg_ch=None, verbose=False, **kwargs)
[docs]def apply_flat(cube, det, imflat_full): """ Apply flat field Includes pixel-to-pixel QE variations or instrument's optical flatfield (e.g., throughput variations across the field). Parameters ========== cube : ndarray Simulated ramp data in e-. These should be intrinsic flux values with Poisson noise, but prior to read noise, kTC, IPC, etc. Size (nz,ny,nx). In det coords. Can either be full frame or match `det` subarray. Returns `det` subarray shape. det : Detector Class Desired detector class output imflat_full : ndarray Full field image of flat field in det coords. Will get trimmed if necessary. """ sh = cube.shape if len(sh)==2: ny, nx = sh else: _, ny, nx = sh # Need to crop in the event of subarrays x1, x2 = (det.x0, det.x0 + nx) y1, y2 = (det.y0, det.y0 + ny) imflat_sub = imflat_full[y1:y2, x1:x2] try: # Assume cube is already paired down to correct subarray size res = cube * imflat_sub except: # Assume cube is full frame cube_sub = cube[y1:y2, x1:x2] if len(sh)==2 else cube[:, y1:y2, x1:x2] res = cube_sub * imflat_sub return res
[docs]def add_cosmic_rays(data, scenario='SUNMAX', scale=1, tframe=10.73677, ref_info=[4,4,4,4], rand_seed=None): """ Add random cosmic rays to data cube""" import json # Load from JSON file file = 'cosmic_rays.json' file_path = os.path.join(conf.PYNRC_PATH, 'sim_params', file) with open(file_path, 'r') as fp: cr_dict = json.load(fp) # Only care about input scenario type_dict = cr_dict[scenario] sh = data.shape if len(sh)==2: ny, nx = sh nz = 1 data = data.reshape([nz,ny,nx]) else: nz, ny, nx = sh # Number of reference pixels [bottom, top, left, right] rb, rt, rl, rr = ref_info # Active detector area npix = (ny - rb - rt) * (nx - rl - rr) area_cm = npix * (18e-4)**2 # For each ion type, add random events ion_keys = type_dict.keys() rng = np.random.default_rng(rand_seed) for k in ion_keys: rate = type_dict[k]['rates'] # events / cm^2 / sec # How many event per frame on average? nhits = rate * area_cm * tframe * scale # Want to sample this distribution for each frame counts = np.asarray(type_dict[k]['counts']) for ii in range(nz): # Assume Poisson statistics on the hits nhits_i = rng.poisson(nhits) # Do a random sample counts_rand = rng.choice(counts, nhits_i) # Random position for each hit xpos_rand = rng.uniform(low=rl, high=nx-rr, size=nhits_i) ypos_rand = rng.uniform(low=rb, high=ny-rt, size=nhits_i) # Add CRs jump to current frame and all sequentional # separate into an integers and fractions intx = xpos_rand.astype(np.int) inty = ypos_rand.astype(np.int) fracx = xpos_rand - intx fracy = ypos_rand - inty # flip negative shift values ind = fracx < 0 fracx[ind] += 1 intx[ind] -= 1 ind = fracy<0 fracy[ind] += 1 inty[ind] -= 1 # Bilinear interpolation of all sources val1 = counts_rand * ((1-fracx)*(1-fracy)) val2 = counts_rand * ((1-fracx)*fracy) val3 = counts_rand * ((1-fracy)*fracx) val4 = counts_rand * (fracx*fracy) # Add source-by-source in case of overlapped indices for i, (iy, ix) in enumerate(zip(inty,intx)): if (iy>=0) and (ix>=0): data[ii:, iy, ix] += val1[i] if (iy+1<ny): data[ii:, iy+1, ix] += val2[i] if (ix+1<nx): data[ii:, iy, ix+1] += val3[i] if (iy+1<ny) and (ix+1<nx): data[ii:, iy+1, ix+1] += val4[i] return data.reshape(sh)
[docs]def xtalk_image(frame, det, coeffs=None): """Create image of crosstalk signal Add amplifier crosstalk to each frame in data cube Parameters ---------- frame : ndarray An image to calculate and add crosstalk to. det : :class:`pynrc.DetectorOps` Detector class corresponding to data. coeffs : None or Table Table of coefficients corresponding to detector crosstalk behavior. """ im_xtalk = np.zeros_like(frame) if det.nout<=1: # No crosstalk if only a single output channel return im_xtalk # Pixel shifts for each sub-channel subch_shift = {"0": 1, "1": -1, "2": 1, "3": -1} if coeffs is None: coeffs = det.xtalk() nchans = det.nout chsize = det.chsize ssd = det.same_scan_direction for ch in range(nchans): ix1, ix2 = int(ch*chsize), int((ch+1)*chsize) im_ch = frame[:,ix1:ix2] receivers = [i for i in range(nchans) if i != ch] for subch in receivers: jx1, jx2 = int(subch*chsize), int((subch+1)*chsize) # Reverse if amplifiers are not both even or both odd flip = False if ssd or (np.mod(ch-subch,2)==0) else True # Primary cross talk coefficients index = 'xt'+str(ch+1)+str(subch+1) corr_amp = im_ch[:,::-1] * coeffs[index] if flip else im_ch * coeffs[index] im_xtalk[:, jx1:jx2] += corr_amp # Post-pixel crosstalk coeffs require shift index = 'xt'+str(ch+1)+str(subch+1)+'post' corr_amp = im_ch[:,::-1] * coeffs[index] if flip else im_ch * coeffs[index] corr_amp = np.roll(corr_amp, subch_shift[str(subch)], axis=1) im_xtalk[:, jx1:jx2] += corr_amp return im_xtalk
[docs]def add_xtalk(data, det, coeffs=None): """Add amplifier crosstalk to each frame in data cube Parameters ---------- data : ndarray 2D or 3D data cube det : :class:`pynrc.DetectorOps` Detector class corresponding to data. coeffs : None or Table Table of coefficients corresponding to detector crosstalk behavior. """ sh = data.shape if len(sh)==2: ny, nx = sh nz = 1 data = data.reshape([nz,ny,nx]) else: nz, ny, nx = sh if coeffs is None: coeffs = det.xtalk() for frame in data: frame += xtalk_image(frame, det, coeffs=coeffs) return data.reshape(sh)
[docs]def simulate_detector_ramp(det, cal_obj, im_slope=None, cframe='sci', out_ADU=False, include_poisson=True, include_dark=True, include_bias=True, include_ktc=True, include_rn=True, apply_ipc=True, apply_ppc=True, include_cpink=True, include_upink=True, include_acn=True, include_refoffsets=True, include_refinst=True, include_colnoise=True, col_noise=None, amp_crosstalk=True, add_crs=True, cr_model='SUNMAX', cr_scale=1, apply_flats=None, apply_nonlinearity=True, random_nonlin=False, latents=None, rand_seed=None, return_zero_frame=None, return_full_ramp=False, prog_bar=True, **kwargs): """ Return a single simulated ramp Parameters ========== det : Detector Class Desired detector class output cal_obj: nircam_cal class NIRCam calibration class that holds the necessary calibration info to simulate a ramp. im_slope : ndarray Input slope image of observed scene. Can either be full frame or match `det` subarray. Returns `det` subarray shape. cframe : str Coordinate frame of input image, 'sci' or 'det'. Output will be in same coordinates. Keyword Args ============ return_zero_frame : bool or None For DMS data, particularly readout patterns with averaged frames, this returns the very first raw read in the ramp. return_full_ramp : bool By default, we average groups and drop frames as specified in the `det` input. If this keyword is set to True, then return all raw frames within the ramp. The last set of `nd2` drop frames are omitted. out_ADU : bool If true, divide by gain and convert to 16-bit UINT. include_poisson : bool Include photon noise? include_dark : bool Add dark current? include_bias : bool Add detector bias? include_ktc : bool Add kTC noise? include_rn : bool Add readout noise per frame? include_cpink : bool Add correlated 1/f noise to all amplifiers? include_upink : bool Add uncorrelated 1/f noise to each amplifier? include_acn : bool Add alternating column noise? apply_ipc : bool Include interpixel capacitance? apply_ppc : bool Apply post-pixel coupling to linear analog signal? include_refoffsets : bool Include reference offsts between amplifiers and odd/even columns? include_refinst : bool Include reference/active pixel instabilities? include_colnoise : bool Add in column noise per integration? col_noise : ndarray or None Option to explicitly specifiy column noise distribution in order to shift by one for subsequent integrations amp_crosstalk : bool Crosstalk between amplifiers? add_crs : bool Add cosmic ray events? See Robberto et al 2010 (JWST-STScI-001928). cr_model: str Cosmic ray model to use: 'SUNMAX', 'SUNMIN', or 'FLARES'. cr_scale: float Scale factor for probabilities. apply_nonlinearity : bool Apply non-linearity? If False, then warning if out_ADU=True random_nonlin : bool Add randomness to the linearity coefficients? apply_flats: bool Apply sub-pixel QE variations (crosshatching and illumination)? latents : None or ndarray (TODO) Apply persistence from previous integration. prog_bar : bool Show a progress bar for this ramp generation? """ from ..reduce.calib import apply_nonlin ################################ # Dark calibration properties dco = cal_obj # Super bias and darks super_bias = dco.super_bias_deconv # DN super_dark = dco.super_dark_deconv # DN/sec # IPC/PPC kernel information k_ipc = dco.kernel_ipc k_ppc = dco.kernel_ppc # Noise info cds_dict = dco.cds_act_dict keys = ['spat_det', 'spat_pink_corr', 'spat_pink_uncorr'] cds_vals = [np.sqrt(np.mean(cds_dict[k]**2, axis=0)) for k in keys] # CDS Noise values # rd_noise_cds, c_pink_cds, u_pink_cds = cds_vals # Noise per frame rn, cp, up = cds_vals / np.sqrt(2) acn = 1 # kTC Reset Noise ktc_noise = dco.ktc_noise # Power spectrum for correlated noise # freq = dco.pow_spec_dict['freq'] scales = dco._pow_spec_dict['ps_corr_scale'] # pcorr_fit = broken_pink_powspec(freq, scales) # Reference noise info ref_ratio = np.mean(dco.cds_ref_dict['spat_det'] / dco.cds_act_dict['spat_det']) ################################ # Detector output configuration # Detector Gain gain = det.gain # Pixel readout nchan = det.nout ny, nx = (det.ypix, det.xpix) x1, x2 = (det.x0, det.x0 + nx) y1, y2 = (det.y0, det.y0 + ny) # Crop super bias and super dark for subarray observations super_bias = super_bias[y1:y2,x1:x2] super_dark = super_dark[y1:y2,x1:x2] # Number of total frames up the ramp (including drops) ma = det.multiaccum nd1 = ma.nd1 nd2 = ma.nd2 nf = ma.nf ngroup = ma.ngroup nz = nd1 + ngroup*nf + (ngroup-1)*nd2 tframe = det.time_frame # Scan direction info ssd = det.same_scan_direction rsd = det.reverse_scan_direction # Number of reference pixels (lower, upper, left, right) ref_info = det.ref_info ################################ # Random seed information rng = np.random.default_rng(rand_seed) # Create random seed values to pass to each function. # This will all be determinate as long as `rand_seed` is defined. # If rand_seed=None, then everthing here will not be repeatable. rseed_keys = [ 'sim_dark_ramp', 'sim_image_ramp', 'add_cosmic_rays', 'apply_nonlin', 'ktc', 'sim_noise_data', 'gen_ramp_biases', 'gen_col_noise' ] rseed_dict = {} for k in rseed_keys: rseed_dict[k] = rng.integers(0, 2**32-1) ################################ # Begin... ################################ if prog_bar: pbar = tqdm(total=14, leave=False) # Init data cube data = np.zeros([nz,ny,nx]) # Work in detector coordinates if (im_slope is not None) and (cframe=='sci'): im_slope = sci_to_det(im_slope, det.detid) #################### # Create a super dark ramp (Units of e-) # Average shape of ramp if prog_bar: pbar.set_description("Dark Current") ramp_avg_ch = dco.dark_ramp_dict['ramp_avg_ch'] # Create dark (adds Poisson noise) if include_dark: rseed = rseed_dict['sim_dark_ramp'] data += sim_dark_ramp(det, super_dark, ramp_avg_ch=ramp_avg_ch, include_poisson=include_poisson, rand_seed=rseed, verbose=False) if prog_bar: pbar.update(1) #################### # Flat field QE variations (crosshatching) # TODO: Add sub-pixel QE varations if prog_bar: pbar.set_description("Flat fields") if im_slope is not None: if apply_flats and (dco.pflats is not None): im_slope = apply_flat(im_slope, det, dco.pflats) if apply_flats and (dco.lflats is not None): im_slope = apply_flat(im_slope, det, dco.lflats) if prog_bar: pbar.update(1) #################### # Add on-sky source image if prog_bar: pbar.set_description("Sky Image") if im_slope is not None: rseed = rseed_dict['sim_image_ramp'] data += sim_image_ramp(det, im_slope, include_poisson=include_poisson, rand_seed=rseed, verbose=False) if prog_bar: pbar.update(1) #################### # Add cosmic rays if prog_bar: pbar.set_description("Cosmic Rays") if add_crs: rseed = rseed_dict['add_cosmic_rays'] data = add_cosmic_rays(data, scenario=cr_model, scale=cr_scale, tframe=tframe, ref_info=ref_info, rand_seed=rseed) if prog_bar: pbar.update(1) #################### # Add non-linearity if prog_bar: pbar.set_description("Non-Linearity") # The apply_nonlin function goes from e- to DN if apply_nonlinearity: rseed = rseed_dict['apply_nonlin'] data = gain * apply_nonlin(data, det, dco.nonlinear_dict, randomize=random_nonlin, rand_seed=rseed) # elif out_ADU: # _log.warn("Assuming perfectly linear ramp, but convert to 16-bit UINT (out_ADU=True)") if prog_bar: pbar.update(1) #################### # TODO: Apply persistence/latent image # Latent signals are anomylous signals # Before or after non-linearity adjustments? if prog_bar: pbar.set_description("Persistence") if latents is not None: pass if prog_bar: pbar.update(1) #################### # Apply IPC # TODO: Before or after non-linearity?? if prog_bar: pbar.set_description("Include IPC") if apply_ipc: data = add_ipc(data, kernel=k_ipc) if prog_bar: pbar.update(1) #################### # Add kTC noise: if prog_bar: pbar.set_description("kTC Noise") if include_ktc: rng2 = np.random.default_rng(rseed_dict['ktc']) ktc_offset = gain * rng2.normal(scale=ktc_noise, size=(ny,nx)) data += ktc_offset if prog_bar: pbar.update(1) #################### # Add super bias if prog_bar: pbar.set_description("Super Bias") if include_bias: data += gain * super_bias if prog_bar: pbar.update(1) #################### # Apply PPC (is this best location for this to occur?) if prog_bar: pbar.set_description("Include PPC") if apply_ppc: data = add_ppc(data, nchans=nchan, kernel=k_ppc, in_place=True, same_scan_direction=ssd, reverse_scan_direction=rsd) if prog_bar: pbar.update(1) #################### # Add amplifier channel crosstalk if prog_bar: pbar.set_description("Amplifier Crosstalk") if amp_crosstalk: data = add_xtalk(data, det, coeffs=None) if prog_bar: pbar.update(1) #################### # Add read and 1/f noise if prog_bar: pbar.set_description("Detector & ASIC Noise") if nchan==1: rn, up = (rn[0], up[0]) rn = None if (not include_rn) else rn up = None if (not include_upink) else up cp = None if (not include_cpink) else cp*1.2 acn = None if (not include_acn) else acn rseed = rseed_dict['sim_noise_data'] data += gain * sim_noise_data(det, rd_noise=rn, u_pink=up, c_pink=cp, acn=acn, corr_scales=scales, ref_ratio=ref_ratio, rand_seed=rseed, verbose=False) if prog_bar: pbar.update(1) #################### # Add reference offsets if prog_bar: pbar.set_description("Ref Pixel Offsets") if include_refoffsets: rseed = rseed_dict['gen_ramp_biases'] data += gain * gen_ramp_biases(dco.ref_pixel_dict, nchan=nchan, include_refinst=include_refinst, data_shape=data.shape, ref_border=ref_info, rand_seed=rseed) if prog_bar: pbar.update(1) #################### # Add column noise if prog_bar: pbar.set_description("Column Noise") # Passing col_noise allows for shifting of noise # by one col ramp-to-ramp in higher level function if include_colnoise and (col_noise is None): rseed = rseed_dict['gen_col_noise'] col_noise = gain * gen_col_noise(dco.column_variations, dco.column_prob_bad, nz=nz, nx=nx, rand_seed=rseed) elif (include_colnoise==False): col_noise = 0 # Add to data data += col_noise if prog_bar: pbar.update(1) # Convert to DN (16-bit int) if out_ADU: data /= gain data[data < 0] = 0 data[data >= 2**16] = 2**16 - 1 data = data.astype('uint16') if prog_bar: pbar.close() # return_zero_frame not set, True if not RAPID (what about BRIGHT1??) if return_zero_frame is None: return_zero_frame = False if 'RAPID' in det.multiaccum.read_mode else True # Return to sci coordinates if cframe=='sci': data = det_to_sci(data, det.detid) if return_full_ramp: if return_zero_frame: return data, data[0].copy() else: return data else: return ramp_resample(data, det, return_zero_frame=return_zero_frame)
[docs]def make_ramp_poisson(im_slope, det, out_ADU=True, zero_data=False, include_poisson=True, rand_seed=None): """ Create a ramp with only photon noise. Useful for quick simulations. im_slope : Slope image (detector coordinates) in e-/sec det : Detector information class out_ADU : Convert to 16-bit UINT? zero_data: Return the so-called "zero frame"? """ # from copy import deepcopy # xpix = det.xpix # ypix = det.ypix rng = np.random.default_rng(rand_seed) ma = det.multiaccum nd1 = ma.nd1 nd2 = ma.nd2 nf = ma.nf ngroup = ma.ngroup t_frame = det.time_frame # Number of total frames up the ramp (including drops) naxis3 = nd1 + ngroup*nf + (ngroup-1)*nd2 # Set reference pixels' slopes equal to 0 w = det.ref_info if w[0] > 0: # lower im_slope[:w[0],:] = 0 if w[1] > 0: # upper im_slope[-w[1]:,:] = 0 if w[2] > 0: # left im_slope[:,:w[2]] = 0 if w[3] > 0: # right im_slope[:,-w[3]:] = 0 # Remove any negative values im_slope[im_slope<0] = 0 # Count accumulation for a single frame frame = im_slope * t_frame # Add Poisson noise at each frame step sh0, sh1 = im_slope.shape new_shape = (naxis3, sh0, sh1) if include_poisson: ramp = rng.poisson(lam=frame, size=new_shape).astype(np.float64) else: ramp = np.array([frame for i in range(naxis3)]) # Perform cumulative sum in place data = np.cumsum(ramp, axis=0) # Convert to ADU (16-bit UINT) # return data if out_ADU: gain = det.gain data /= gain data[data < 0] = 0 data[data >= 2**16] = 2**16 - 1 data = data.astype('uint16') return ramp_resample(data, det, return_zero_frame=zero_data)