"""pyNRC utility functions"""
import os, re
# Import libraries
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
on_rtd = os.environ.get('READTHEDOCS') == 'True'
# Update matplotlib settings
rcvals = {'xtick.minor.visible': True, 'ytick.minor.visible': True,
'xtick.direction': 'in', 'ytick.direction': 'in',
'xtick.top': True, 'ytick.right': True, 'font.family': ['serif'],
'xtick.major.size': 6, 'ytick.major.size': 6,
'xtick.minor.size': 3, 'ytick.minor.size': 3,
'image.interpolation': 'nearest', 'image.origin': 'lower',
'figure.figsize': [8,6], 'mathtext.fontset':'cm'}#,
#'text.usetex': True, 'text.latex.preamble': ['\usepackage{gensymb}']}
if not on_rtd:
matplotlib.rcParams.update(rcvals)
cmap_pri, cmap_alt = ('viridis', 'gist_heat')
matplotlib.rcParams['image.cmap'] = cmap_pri if cmap_pri in plt.colormaps() else cmap_alt
import datetime, time
import sys, platform
import multiprocessing as mp
import traceback
from astropy.io import fits, ascii
from astropy.table import Table
from astropy.time import Time
# from astropy import units
#from scipy.optimize import least_squares#, leastsq
#from scipy.ndimage import fourier_shift
from scipy.interpolate import griddata, RegularGridInterpolator, interp1d
from numpy.polynomial import legendre
from . import conf
from .logging_utils import setup_logging
from webbpsf_ext.bandpasses import nircam_com_th, nircam_com_nd
from .maths import robust
from .maths.fast_poly import *
from .maths.image_manip import *
from .maths.coords import *
# from .maths.image_manip import frebin, fshift, pad_or_cut_to_size
# from .maths.image_manip import hist_indices, binned_statistic
# from .maths.coords import dist_image, xy_to_rtheta, rtheta_to_xy, xy_rot
# from .maths.coords import det_to_sci, sci_to_det, plotAxes
###########################################################################
# Logging info
###########################################################################
import logging
_log = logging.getLogger('pynrc')
###########################################################################
# WebbPSF
###########################################################################
try:
import webbpsf_ext
except ImportError:
raise ImportError('webbpsf_ext is not installed. pyNRC depends on its inclusion.')
# Some useful functions for displaying and measuring PSFs
import webbpsf, poppy
from poppy import (radial_profile, measure_radial, measure_fwhm, measure_ee)
from poppy import (measure_sharpness, measure_centroid) #, measure_strehl)
import pysynphot as S
# Extend default wavelength range to 5.6 um
S.refs.set_default_waveset(minwave=500, maxwave=56000, num=10000.0, delta=None, log=False)
# JWST 25m^2 collecting area
# Flux loss from masks and occulters are taken into account in WebbPSF
# S.refs.setref(area = 25.4e4) # cm^2
S.refs.setref(area = 25.78e4) # cm^2 according to jwst_pupil_RevW_npix1024.fits.gz
# The following won't work on readthedocs compilation
if not on_rtd:
# Grab WebbPSF assumed pixel scales
log_prev = conf.logging_level
setup_logging('WARN', verbose=False)
nc_temp = webbpsf_ext.NIRCam_ext()
setup_logging(log_prev, verbose=False)
pixscale_SW = nc_temp._pixelscale_short
pixscale_LW = nc_temp._pixelscale_long
del nc_temp
_jbt_exists = True
try:
from jwst_backgrounds import jbt
except ImportError:
_log.info(" jwst_backgrounds is not installed and will not be used for bg estimates.")
_jbt_exists = False
###########################################################################
# pysiaf
###########################################################################
import pysiaf
from pysiaf import JWST_PRD_VERSION, rotations, Siaf
# Create this once since it takes time to call multiple times
siaf_nrc = Siaf('NIRCam')
siaf_nrc.generate_toc()
#__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
#__location__ += '/'
__epsilon = np.finfo(float).eps
###########################################################################
#
# Pysynphot Bandpasses
#
###########################################################################
from webbpsf_ext.bandpasses import bp_igood, bp_wise, bp_2mass, bp_gaia
from webbpsf_ext.bandpasses import nircam_filter as read_filter
###########################################################################
#
# Sensitivities and Saturation Limits
#
###########################################################################
from webbpsf_ext.bandpasses import nircam_grism_res as grism_res
from webbpsf_ext.bandpasses import nircam_grism_wref as grism_wref
from webbpsf_ext.maths import radial_std
[docs]def channel_select(bp):
"""Select wavelength channel
Based on input bandpass, return the pixel scale, dark current, and
excess read noise parameters. These values are typical for either
a SW or LW NIRCam detector.
Parameters
----------
bp : :mod:`pysynphot.obsbandpass`
NIRCam filter bandpass.
"""
if bp.avgwave()/1e4 < 2.3:
pix_scale = pixscale_SW # pixel scale (arcsec/pixel)
idark = 0.003 # dark current (e/sec)
pex = (1.0,5.0)
else:
pix_scale = pixscale_LW
idark = 0.03
pex = (1.5,10.0)
return (pix_scale, idark, pex)
[docs]def get_detname(det_id):
"""Return NRC[A-B][1-5] for valid detector/SCA IDs"""
det_dict = {481:'A1', 482:'A2', 483:'A3', 484:'A4', 485:'A5',
486:'B1', 487:'B2', 488:'B3', 489:'B4', 490:'B5'}
scaids = det_dict.keys()
detids = det_dict.values()
detnames = ['NRC' + idval for idval in detids]
# If already valid, then return
if det_id in detnames:
return det_id
elif det_id in scaids:
detname = 'NRC' + det_dict[det_id]
elif det_id in detids:
detname = 'NRC' + det_id
else:
detname = det_id
# If NRCALONG or or NRCBLONG, change 'LONG' to '5'
if 'LONG' in detname:
detname = detname[0:4] + '5'
if detname not in detnames:
raise ValueError("Invalid detector: {} \n\tValid names are: {}" \
.format(detname, ', '.join(detnames)))
return detname
[docs]def var_ex_model(ng, nf, params):
""" Variance Excess Model
Measured pixel variance shows a slight excess above the measured values.
The input `params` describes this excess variance. This funciton can be
used to fit the excess variance for a variety of different readout patterns.
"""
return 12. * (ng - 1.)/(ng + 1.) * params[0]**2 - params[1] / nf**0.5
[docs]def pix_noise(ngroup=2, nf=1, nd2=0, tf=10.73677, rn=15.0, ktc=29.0, p_excess=(0,0),
fsrc=0.0, idark=0.003, fzodi=0, fbg=0, ideal_Poisson=False,
ff_noise=False, **kwargs):
"""Noise per pixel
Theoretical noise calculation of a generalized MULTIACCUM ramp in terms of e-/sec.
Includes flat field errors from JWST-CALC-003894.
Parameters
----------
n : int
Number of groups in integration rampl
m : int
Number of frames in each groupl
s : int
Number of dropped frames in each groupl
tf : float
Frame time
rn : float
Read Noise per pixel (e-).
ktc : float
kTC noise (in e-). Only valid for single frame (n=1)l
p_excess : array-like
An array or list of two elements that holds the parameters
describing the excess variance observed in effective noise plots.
By default these are both 0. For NIRCam detectors, recommended
values are [1.0,5.0] for SW and [1.5,10.0] for LW.
fsrc : float
Flux of source in e-/sec/pix.
idark : float
Dark current in e-/sec/pix.
fzodi : float
Zodiacal light emission in e-/sec/pix.
fbg : float
Any additional background (telescope emission or scattered light?)
ideal_Poisson : bool
If set to True, use total signal for noise estimate,
otherwise MULTIACCUM equation is used.
ff_noise : bool
Include flat field errors in calculation? From JWST-CALC-003894.
Default=False.
Notes
-----
Various parameters can either be single values or numpy arrays.
If multiple inputs are arrays, make sure their array sizes match.
Variables that need to have the same array shapes (or a single value):
- n, m, s, & tf
- rn, idark, ktc, fsrc, fzodi, & fbg
Array broadcasting also works.
Example
-------
>>> n = np.arange(50)+1 # An array of different ngroups to test out
>>> # Create 2D Gaussian PSF with FWHM = 3 pix
>>> npix = 20 # Number of pixels in x and y direction
>>> fwhm = 3.0
>>> x = np.arange(0, npix, 1, dtype=float)
>>> y = x[:,np.newaxis]
>>> x0 = y0 = npix // 2 # Center position
>>> fsrc = np.exp(-4*np.log(2.) * ((x-x0)**2 + (y-y0)**2) / fwhm**2)
>>> fsrc /= fsrc.max()
>>> fsrc *= 10 # 10 counts/sec in peak pixel
>>> fsrc = fsrc.reshape(npix,npix,1) # Necessary for broadcasting
>>> # Represents pixel array w/ slightly different RN/pix
>>> rn = 15 + np.random.normal(loc=0, scale=0.5, size=(1,npix,npix))
>>> # Results is a 50x(20x20) showing the noise in e-/sec/pix at each group
>>> noise = pix_noise(ngroup=n, rn=rn, fsrc=fsrc)
"""
# Convert everything to arrays
n = np.array(ngroup)
m = np.array(nf)
s = np.array(nd2)
tf = np.array(tf)
# Total flux (e-/sec/pix)
ftot = fsrc + idark + fzodi + fbg
# Special case if n=1
# To be inserted at the end
if (n==1).any():
# Variance after averaging m frames
var = ktc**2 + (rn**2 + ftot*tf) / m
noise = np.sqrt(var)
noise /= tf # In terms of e-/sec
if (n==1).all(): return noise
noise_n1 = noise
ind_n1 = (n==1)
temp = np.array(rn+ktc+ftot)
temp_bool = np.zeros(temp.shape, dtype=bool)
ind_n1_all = (temp_bool | ind_n1)
# Group time
tg = tf * (m + s)
# Effective integration time
tint = tg * (n - 1)
# Read noise, group time, and frame time variances
# This is the MULTIACCUM eq from Rauscher et al. (2007).
# This equation assumes that the slope-fitting routine uses
# incorrect covariance matrix that doesn't take into account
# the correlated Poisson noise up the ramp.
var_rn = rn**2 * 12. * (n - 1.) / (m * n * (n + 1.))
var_gp = ftot * tint * 6. * (n**2. + 1.) / (5 * n * (n + 1.))
var_fm = ftot * tf * 2. * (m**2. - 1.) * (n - 1.) / (m * n * (n + 1.))
# Functional form for excess variance above theoretical
# Empirically measured formulation
# var_ex = 12. * (n - 1.)/(n + 1.) * p_excess[0]**2 - p_excess[1] / m**0.5
var_ex = var_ex_model(n, m, p_excess)
# Variance of total signal
var_poisson = (ftot * tint) if ideal_Poisson else (var_gp - var_fm)
# Total variance
var = var_rn + var_poisson + var_ex
sig = np.sqrt(var)
# Noise in e-/sec
noise = sig / tint
# Make sure to copy over ngroup=1 cases
if (n==1).any():
noise[ind_n1_all] = noise_n1[ind_n1_all]
#print(ind_n1_all.shape,noise.shape,noise_n1.shape)
# Include flat field noise
# JWST-CALC-003894
if ff_noise:
noise_ff = 1E-4 # Uncertainty in the flat field
factor = 1 + noise_ff*np.sqrt(ftot)
noise *= factor
return noise
###########################################################################
#
# Pysynphot Spectrum Wrappers
#
###########################################################################
from webbpsf_ext.spectra import BOSZ_spectrum, stellar_spectrum, source_spectrum
from webbpsf_ext.spectra import planets_sb12, sp_accr, jupiter_spec, companion_spec
from webbpsf_ext.spectra import linder_table, linder_filter, cond_table, cond_filter
from webbpsf_ext.spectra import bin_spectrum, mag_to_counts
[docs]def bin_spectrum(sp, wave, waveunits='um'):
"""Rebin spectrum
Rebin a :mod:`pysynphot.spectrum` to a different wavelength grid.
This function first converts the input spectrum to units
of counts then combines the photon flux onto the
specified wavelength grid.
Output spectrum units are the same as the input spectrum.
Parameters
-----------
sp : :mod:`pysynphot.spectrum`
Spectrum to rebin.
wave : array_like
Wavelength grid to rebin onto.
waveunits : str
Units of wave input. Must be recognizeable by Pysynphot.
Returns
-------
:mod:`pysynphot.spectrum`
Rebinned spectrum in same units as input spectrum.
"""
waveunits0 = sp.waveunits
fluxunits0 = sp.fluxunits
# Convert wavelength of input spectrum to desired output units
sp.convert(waveunits)
# We also want input to be in terms of counts to conserve flux
sp.convert('flam')
edges = S.binning.calculate_bin_edges(wave)
ind = (sp.wave >= edges[0]) & (sp.wave <= edges[-1])
binflux = binned_statistic(sp.wave[ind], sp.flux[ind], np.mean, bins=edges)
# Interpolate over NaNs
ind_nan = np.isnan(binflux)
finterp = interp1d(wave[~ind_nan], binflux[~ind_nan], kind='cubic')
binflux[ind_nan] = finterp(wave[ind_nan])
sp2 = S.ArraySpectrum(wave, binflux, waveunits=waveunits, fluxunits='flam')
sp2.convert(waveunits0)
sp2.convert(fluxunits0)
# Put back units of original input spectrum
sp.convert(waveunits0)
sp.convert(fluxunits0)
return sp2
[docs]def zodi_spec(zfact=None, ra=None, dec=None, thisday=None, **kwargs):
"""Zodiacal light spectrum.
New: Use `ra`, `dec`, and `thisday` keywords to call `jwst_backgrounds`
to obtain more accurate predictions of the background.
Creates a spectrum of the zodiacal light emission in order to estimate the
in-band sky background flux. This is primarily the addition of two blackbodies
at T=5300K (solar scattered light) and T=282K (thermal dust emission)
that have been scaled to match literature flux values.
In reality, the intensity of the zodiacal dust emission varies as a
function of viewing position. In this case, we have added the option
to scale the zodiacal level (or each component individually) by some
user-defined factor 'zfact'. The user can set zfact as a scalar in order
to scale the entire spectrum. If defined as a list, tuple, or np array,
then the each component gets scaled where T=5300K corresponds to the first
elements and T=282K is the second element of the array.
The `zfact` parameter has no effect if `jwst_backgrounds` is called.
Representative values for zfact:
* 0.0 - No zodiacal emission
* 1.0 - Minimum zodiacal emission from JWST-CALC-003894
* 1.2 - Required NIRCam performance
* 2.5 - Average (default)
* 5.0 - High
* 10.0 - Maximum
Parameters
----------
zfact : float
Factor to scale Zodiacal spectrum (default 2.5).
ra : float
Right ascension in decimal degrees
dec : float
Declination in decimal degrees
thisday: int
Calendar day to use for background calculation. If not given, will
use the average of visible calendar days.
Returns
-------
:mod:`pysynphot.spectrum`
Output is a Pysynphot spectrum with default units of flam (erg/s/cm^2/A/sr).
Note: Pysynphot doesn't recognize that it's per steradian, but we must keep
that in mind when integrating the flux per pixel.
Notes
-----
Added the ability to query the Euclid background model using
:func:`zodi_euclid` for a specific location and observing time.
The two blackbodies will be scaled to the 1.0 and 5.5 um emission.
This functionality is deprecated in favor of jwst_backgrounds.
Keyword Args
------------
locstr :
Object name or RA/DEC (decimal degrees or sexigesimal).
Queries the `IPAC Euclid Background Model
<http://irsa.ipac.caltech.edu/applications/BackgroundModel/>`_
year : int
Year of observation.
day : float
Day of observation.
"""
if (ra is not None) and (dec is not None):
if _jbt_exists == False:
_log.warning("`jwst_backgrounds` not installed. `ra`, `dec`, and `thisday` parameters will not work.")
else:
# Wavelength for "bathtub plot" (not used here)
wave_bath = 2.5
try:
bkg = jbt.background(ra, dec, wave_bath)
except:
_log.error('Cannot reach JWST Background servers. Reverting to `zfact` input.')
else:
# Get wavelength and flux values
wvals = bkg.bkg_data['wave_array'] # Wavelength (um)
farr = bkg.bkg_data['total_bg'] # Total background (MJy/sr)
if thisday is None:
# Use average of visible calendar days
ftot = farr.mean(axis=0)
else:
calendar = bkg.bkg_data['calendar']
if thisday in calendar:
ind = np.where(calendar==thisday)[0][0]
ftot = farr[ind]
else:
_log.warning("The input calendar day {}".format(thisday)+" is not available. \
Choosing closest visible day.")
diff = np.abs(calendar-thisday)
ind = np.argmin(diff)
ftot = farr[ind]
sp = S.ArraySpectrum(wave=wvals*1e4, flux=ftot*1e6, fluxunits='Jy')
sp.convert('flam')
sp.name = 'Total Background'
return sp
if zfact is None:
zfact = 2.5
#_log.debug('zfact:{0:.1f}'.format(zfact))
if isinstance(zfact, (list, tuple, np.ndarray)):
f1, f2 = zfact
else:
f1 = f2 = zfact
# These values have been scaled to match JWST-CALC-003894 values
# in order to work with Pysynphot's blackbody function.
# Pysynphot's BB function is normalized to 1Rsun at 1kpc by default.
f1 *= 4.0e7
f2 *= 2.0e13
bb1 = f1 * S.BlackBody(5300.0)
bb2 = f2 * S.BlackBody(282.0)
# Query Euclid Background Model
locstr = kwargs.get('locstr')
year = kwargs.get('year')
day = kwargs.get('day')
if (locstr is not None) and (year is not None) and (day is not None):
# Wavelengths in um and values in MJy
waves = np.array([1.0,5.5])
vals = zodi_euclid(locstr, year, day, waves, **kwargs)
bb1.convert('Jy')
bb2.convert('Jy')
# MJy at wavelength locations
f_bb1 = bb1.sample(waves*1e4) / 1e6
f_bb2 = bb2.sample(waves*1e4) / 1e6
bb1 *= (vals[0]-f_bb2[0])/f_bb1[0]
bb2 *= (vals[1]-f_bb1[1])/f_bb2[1]
sp_zodi = bb1 + bb2
sp_zodi.convert('flam')
sp_zodi.name = 'Zodiacal Light'
return sp_zodi
[docs]def zodi_euclid(locstr, year, day, wavelengths=[1,5.5], ido_viewin=0, **kwargs):
"""IPAC Euclid Background Model
Queries the `IPAC Euclid Background Model
<http://irsa.ipac.caltech.edu/applications/BackgroundModel/>`_
in order to get date and position-specific zodiacal dust emission.
The program relies on ``urllib3`` to download the page in XML format.
However, the website only allows single wavelength queries, so
this program implements a multithreaded procedure to query
multiple wavelengths simultaneously. However, due to the nature
of the library, only so many requests are allowed to go out at a time,
so this process can take some time to complete.
Testing shows about 500 wavelengths in 10 seconds as a rough ballpark.
Recommended to grab only a few wavelengths for normalization purposes.
Parameters
----------
locstr : str
This input field must contain either coordinates (as string),
or an object name resolveable via NED or SIMBAD.
year: string
Year. Limited to 2018 to 2029 for L2 position.
day : string
Day of year (1-366). Limited to 2018 Day 274 to 2029 Day 120
for L2 position and ido_viewin=0.
wavelength : array-like
Wavelength in microns (0.5-1000).
ido_viewin : 0 or 1
If set to 0, returns zodiacal emission at specific location for input time.
If set to 1, then gives the median value for times of the year that the object
is in a typical spacecraft viewing zone. Currently this is set to solar
elongations between 85 and 120 degrees.
References
----------
See the `Euclid Help Website
<http://irsa.ipac.caltech.edu/applications/BackgroundModel/docs/dustProgramInterface.html>`_
for more details.
"""
# from urllib2 import urlopen
import urllib3
import xmltodict
from multiprocessing.pool import ThreadPool
def fetch_url(url):
"""
TODO: Add error handling.
"""
# response = urlopen(url)
# response = response.read()
http = urllib3.PoolManager()
response = http.request('GET', url)
d = xmltodict.parse(response.data, xml_attribs=True)
fl_str = d['results']['result']['statistics']['zody']
return float(fl_str.split(' ')[0])
#locstr="17:26:44 -73:19:56"
#locstr = locstr.replace(' ', '+')
#year=2019
#day=1
#obslocin=0
#ido_viewin=1
#wavelengths=None
req_list = []
for w in wavelengths:
url = 'http://irsa.ipac.caltech.edu/cgi-bin/BackgroundModel/nph-bgmodel?'
req = "{}&locstr={}&wavelength={:.2f}&year={}&day={}&obslocin=0&ido_viewin={}"\
.format(url, locstr, w, year, day, ido_viewin)
req_list.append(req)
nthread = np.min([50,len(wavelengths)])
pool = ThreadPool(nthread)
results = pool.imap(fetch_url, req_list)
res = []
for r in results: res.append(r)
pool.close()
return np.array(res)
# def _zodi_spec_old(level=2):
# """
# Create a spectrum of the zodiacal light emission in order to estimate the
# in-band sky background flux. This is simply the addition of two blackbodies
# at T=5800K (solar scattered light) and T=300K (thermal dust emission)
# that have been scaled to match the literature flux values.
#
# In reality, the intensity of the zodiacal dust emission varies as a
# function of viewing position. In this case, we have added different levels
# intensity similiar to the results given by old NIRCam ETC. These have not
# been validated in any way and should be used with caution, but at least
# give an order of magnitude of the zodiacal light background flux.
#
# There are four different levels that can be passed through the level
# parameter: 0=None, 1=Low, 2=Avg, 3=High
#
# For instance set sp_zodi = zodi_spec(3) for a highish sky flux.
# Default is 2
# """
#
# bb1 = S.BlackBody(5800.); bb2 = S.BlackBody(300.)
# sp_zodi = (1.7e7*bb1 + 2.3e13*bb2) * 3.73
# sp_zodi.convert('flam')
#
# # This is how some case statements are done in Python
# # Select the level of zodiacal light emission
# # 0=None, 1=Low, 2=Avg, 3=High
# switcher = {0:0.0, 1:0.5, 2:1.0, 3:1.8}
# factor = switcher.get(level, None)
#
# if factor is None:
# _log.warning('The input parameter level=%s is not valid. Setting zodiacal light to 0.' % level)
# _log.warning('Valid values inlclude: %s' % switcher.keys())
# factor = 0
#
# sp_zodi *= factor
# sp_zodi.name = 'Zodiacal Light'
#
# return sp_zodi
[docs]def grism_background_image(filter, pupil='GRISM0', module='A', sp_bg=None,
include_com=True, **kwargs):
"""Create full grism background image"""
# Option for GRISMR/GRISMC
if 'GRISMR' in pupil:
pupil = 'GRISM0'
elif 'GRISMC' in pupil:
pupil = 'GRISM90'
upper = 9.6 if include_com else 31.2
g_bg = grism_background(filter, pupil, module, sp_bg, upper=upper, **kwargs)
final_image = np.zeros([2048,2048])
if 'GRISM0' in pupil:
final_image = final_image + g_bg.reshape([1,-1])
else:
final_image = final_image + g_bg.reshape([-1,1])
# Add COM background
if include_com:
final_image += grism_background_com(filter, pupil, module, sp_bg, **kwargs)
return final_image
[docs]def grism_background(filter, pupil='GRISM0', module='A', sp_bg=None,
orders=[1,2], wref=None, upper=9.6, **kwargs):
"""
Returns a 1D array of grism Zodiacal/thermal background
emission model, including roll-off from pick-off mirror (POM)
edges. By default, this includes light dispersed by the
1st and 2nd grism orders (m=1 and m=2).
For column dipsersion, we ignore the upper region occupied by
the coronagraphic mask region by default. The preferred way to
include this region is to add the dispersed COM image from the
`grism_background_com` function to create the full 2048x2048
image. Or, more simply (but less accurate) is to set an `upper`
value of 31.2, which is the approximately distance (in arcsec)
from the top of the detector to the top of the coronagraphic
field of view.
Parameters
==========
filter : str
Name of filter (Long Wave only).
pupil : str
Either 'GRISM0' ('GRISMR') or 'GRISM90' ('GRISMC').
module : str
NIRCam 'A' or 'B' module.
sp_bg : :mod:`pysynphot.spectrum`
Spectrum of Zodiacal background emission, which gets
multiplied by bandpass throughput to determine final
wavelength-dependent flux that is then dispersed.
orders : array-like
What spectral orders to include? Valid orders are 1 and 2.
wref : float or None
Option to set the undeviated wavelength, otherwise this will
search a lookup table depending on the grism.
upper : float
Set the maximum bounds for out-of-field flux to be dispersed
onto the detector. By default, this value is 9.6", corresponding
to the bottom of the coronagraphic mask. Use `grism_background_com`
to then include image of dispersed COM mask.
If you want something simpler, increase this value to 31.2" to
assume the coronagraphic FoV is free of any holder blockages or
substrate and occulting masks.
Keyword Args
============
zfact : float
Factor to scale Zodiacal spectrum (default 2.5).
ra : float
Right ascension in decimal degrees
dec : float
Declination in decimal degrees
thisday: int
Calendar day to use for background calculation. If not given, will
use the average of visible calendar days.
"""
# Option for GRISMR/GRISMC
if 'GRISMR' in pupil:
pupil = 'GRISM0'
elif 'GRISMC' in pupil:
pupil = 'GRISM90'
# Pixel scale
pix_scale, _, _ = channel_select(read_filter(filter))
# Undeviated wavelength
if wref is None:
wref = grism_wref(pupil, module)
# Background spectrum
if sp_bg is None:
sp_bg = zodi_spec(**kwargs)
# Total number of "virtual" pixels spanned by pick-off mirror
border = np.array([8.4, 8.0]) if ('GRISM0' in pupil) else np.array([12.6, upper])
extra_pix = (border / pix_scale + 0.5).astype('int')
extra_pix[extra_pix<=0] = 1 # Ensure there's at least 1 extra pixel
npix_tot = 2048 + extra_pix.sum()
flux_all = np.zeros(npix_tot)
for grism_order in orders:
# Get filter throughput and create bandpass
bp = read_filter(filter, pupil=pupil, module=module,
grism_order=grism_order, **kwargs)
# Get wavelength dispersion solution
res, dw = grism_res(pupil, module, grism_order) # Resolution and dispersion
# Observation spectrum converted to count rate
obs_bg = S.Observation(sp_bg, bp, bp.wave)
obs_bg.convert('counts')
# Total background flux per pixel (not dispersed)
area_scale = (pix_scale/206265.0)**2
fbg_tot = obs_bg.countrate() * area_scale
# Total counts/sec within each wavelength bin
binwave = obs_bg.binwave/1e4
binflux = obs_bg.binflux*area_scale
# Interpolation function
fint = interp1d(binwave, binflux, kind='cubic')
# Wavelengths at each pixel to interpolate
wave_vals = np.arange(binwave.min(), binwave.max(), dw)
# Get flux values and preserve total flux
flux_vals = fint(wave_vals)
flux_vals = fbg_tot * flux_vals / flux_vals.sum()
# # Wavelengths at each pixel to interpolate
# wave_vals = np.arange(bp.wave.min()/1e4, bp.wave.max()/1e4, dw)
# # Rebin onto desired wavelength grid
# sp_new = bin_spectrum(sp_bg, wave_vals, waveunits='um')
# obs_bg = S.Observation(sp_new, bp, binset=sp_new.wave)
# # Get flux values per pixel
# obs_bg.convert('counts')
# flux_vals = obs_bg.binflux * (pix_scale/206265.0)**2
# Index of reference wavelength
iref = int((wref - wave_vals[0]) / (wave_vals[1] - wave_vals[0]))
# Determine the array indices that contribute for each pixel
# Use indexing rather than array shifting for speed
# This depends on the size of the POM relative to detector
offset = -1*int(wref*res/2 + 0.5) if grism_order==2 else 0
i1_arr = np.arange(iref,iref-npix_tot,-1)[::-1] + offset
i2_arr = np.arange(iref,iref+npix_tot,+1) + offset
i1_arr[i1_arr<0] = 0
i1_arr[i1_arr>len(wave_vals)] = len(wave_vals)
i2_arr[i2_arr<0] = 0
i2_arr[i2_arr>len(wave_vals)] = len(wave_vals)
flux_all += np.array([flux_vals[i1:i2].sum() for i1,i2 in zip(i1_arr,i2_arr)])
# Crop only detector pixels
flux_all = flux_all[extra_pix[0]:-extra_pix[1]]
# Module B GRISM0/R disperses in opposite direction ('sci' coords)
if ('GRISM0' in pupil) and (module=='B'):
flux_all = flux_all[::-1]
# Return single
return flux_all
[docs]def grism_background_com(filter, pupil='GRISM90', module='A', sp_bg=None,
wref=None, **kwargs):
# Option for GRISMR/GRISMC
if 'GRISMR' in pupil:
pupil = 'GRISM0'
elif 'GRISMC' in pupil:
pupil = 'GRISM90'
if 'GRISM0' in pupil:
_log.info('COM feature not present for row grisms.')
return 0
# Only see COM for 1st order
# Minimum wavelength is 2.4um, which means 2nd order is 2400 pixels away.
grism_order = 1
# Get filter throughput and create bandpass
bp = read_filter(filter, pupil=pupil, module=module, grism_order=grism_order,
coron_substrate=True, **kwargs)
# Pixel scale
pix_scale, _, _ = channel_select(read_filter(filter))
# Get wavelength dispersion solution
res, dw = grism_res(pupil, module, grism_order)
# Undeviated wavelength
wref = grism_wref(pupil, module) if wref is None else wref
# Background spectrum
sp_bg = zodi_spec(**kwargs) if sp_bg is None else sp_bg
# Coronagraphic mask image
im_com = build_mask_detid(module+'5', filter=filter)
# Crop to mask holder
# Remove anything that is 0 or max
im_collapse = im_com.sum(axis=1)
ind_cut = (im_collapse == im_collapse.max()) | (im_collapse == 0)
im_com = im_com[~ind_cut]
ny_com, nx_com = im_com.shape
# Observation spectrum converted to count rate
obs_bg = S.Observation(sp_bg, bp, bp.wave)
obs_bg.convert('counts')
# Total background flux per pixel (not dispersed)
area_scale = (pix_scale/206265.0)**2
fbg_tot = obs_bg.countrate() * area_scale
# Total counts/sec within each wavelength bin
binwave = obs_bg.binwave/1e4
binflux = obs_bg.binflux*area_scale
# Interpolation function
fint = interp1d(binwave, binflux, kind='cubic')
# Wavelengths at each pixel to interpolate
wave_vals = np.arange(binwave.min(), binwave.max(), dw)
# Get flux values and preserve total flux
flux_vals = fint(wave_vals)
flux_vals = fbg_tot * flux_vals / flux_vals.sum()
# Index of reference wavelength in spectrum
iref = int((wref - wave_vals[0]) / (wave_vals[1] - wave_vals[0]))
# Pixel position of COM image lower and upper bounds
upper = 9.6
ipix_ref = 2048 + int(upper/pix_scale + 0.5)
ipix_lower = ipix_ref - iref
ipix_upper = ipix_lower + ny_com + len(flux_vals)
# print('COM', ipix_lower, ipix_upper)
# Only include if pixel positions overlap detector frame
if (ipix_upper>0) and (ipix_lower<2048):
# Shift and add images
im_shift = np.zeros([ny_com+len(flux_vals), nx_com])
# print(len(flux_vals))
for i, f in enumerate(flux_vals):
im_shift[i:i+ny_com,:] += im_com*f
# Position at appropriate location within detector frame
# First, either pad the lower, or crop to set bottom of detector
if ipix_lower>=0 and ipix_lower<2048:
im_shift = np.pad(im_shift, ((ipix_lower,0),(0,0)))
elif ipix_lower<0:
im_shift = im_shift[-ipix_lower:,:]
# Expand or contract to final full detector size
if im_shift.shape[0]<2048:
im_shift = np.pad(im_shift, ((0,2048-im_shift.shape[0]),(0,0)))
else:
im_shift = im_shift[0:2048,:]
res = im_shift
else:
res = 0
return res
[docs]def make_grism_slope(nrc, src_tbl, tel_pointing, expnum, add_offset=None, **kwargs):
""" Create slope image
"""
# Set SIAF aperture info
siaf_ap_obs = tel_pointing.siaf_ap_obs
ap_siaf = siaf_ap_obs
ap_obs = ap_siaf.AperName
# Convert expnum to int if input as string
expnum = int(expnum) if isinstance(expnum, str) else expnum
# Grab poining info for specific exposure number
ind = np.where(tel_pointing.exp_nums == expnum)[0][0]
# Include a offset shift to better reposition spectrum?
add_offset = np.array([0,0]) if add_offset is None else np.asarray(add_offset)
idl_off = np.array([(tel_pointing.position_offsets_act[ind])]) + add_offset
ra_deg = src_tbl['ra'].to('deg').value
dec_deg = src_tbl['dec'].to('deg').value
mags = src_tbl[nrc.filter].data
try:
teff = src_tbl['Teff']
sptype = None
except:
teff = None
sptype = src_tbl['SpType']
# Convert all RA/Dec values to V3/V3
v2_obj, v3_obj = tel_pointing.radec_to_frame((ra_deg, dec_deg), frame_out='tel', idl_offsets=idl_off)
# Get pixel locations
xpix, ypix = ap_siaf.tel_to_sci(v2_obj, v3_obj)
# Pickoff mirror information
x1, x2, y1, y2 = pickoff_xy(ap_obs)
# Mask out all sources that are outside pick-off mirror
mask_pom = ((xpix>x1) & (xpix<x2-1)) & ((ypix>y1) & (ypix<y2-1))
# Mask out all sources that will not contribute to final slope image
wspec, imspec_temp = nrc.calc_psf_from_coeff(return_oversample=False, return_hdul=False)
ypsf, xpsf = imspec_temp.shape
xmin, ymin = -1*np.array([xpsf,ypsf]).astype('int') / 2 - 1
xmax = int(ap_siaf.XSciSize + xpsf / 2 + 1)
ymax = int(ap_siaf.YSciSize + ypsf / 2 + 1)
xmask = (xpix>=xmin) & (xpix<=xmax)
ymask = (ypix>=ymin) & (ypix<=ymax)
# Final mask
mask = mask_pom & xmask & ymask
# Select final positions and spectral type information
xpix = xpix[mask]
ypix = ypix[mask]
mags_field = mags[mask]
teff = teff[mask] if teff is not None else None
sptype = sptype[mask] if sptype is not None else None
# src_flux = mag_to_counts(mags_field, nrc.bandpass, **kwargs)
_log.info(np.array([xpix, ypix, mags_field]).T)
# Get undeviated wavelength
wref = grism_wref(nrc.pupil_mask, nrc.module)
nx, ny = (nrc.Detector.xpix, nrc.Detector.ypix)
im_slope = np.zeros([ny,nx])
# Build final image
wspec_all = []
for i in range(xpix.size):
# Get stellar spectrum
teff_i = teff[i] if teff is not None else None
sptp_i = sptype[i] if sptype is not None else 'G2V'
sp = stellar_spectrum(sptp_i, mags_field[i], 'vegamag', nrc.bandpass,
Teff=teff_i, metallicity=0, log_g=4.5)
# Create spectral image
xr, yr = xpix[i], ypix[i]
wspec, imspec = place_grism_spec(nrc, sp, xr, yr, wref=wref, return_oversample=False)
im_slope += imspec
wspec_all.append(wspec)
del imspec
wspec_all = np.asarray(wspec_all)
return wspec_all, im_slope
[docs]def place_grism_spec(nrc, sp, xpix, ypix, wref=None, return_oversample=False):
""" Create spectral image and place ref wavelenght at (x,y) location
Given a NIRCam instrument object and input spectrum, create a dispersed
PSF and place the undeviated reference wavelength at the specified
(xpix,ypix) coordinates (assuming 'sci' coords).
Returned values will be a tuple of (wspec, imspec)
"""
nx, ny = (nrc.Detector.xpix, nrc.Detector.ypix)
oversample = nrc.oversample
nx_over = nx * oversample
ny_over = ny * oversample
pupil_mask = nrc.pupil_mask
if pupil_mask is None:
_log.warn('place_grism_spec: NIRCam pupil mask set to None. Should be GRISMR or GRISMC.')
pupil_mask = 'NONE'
# Determine reference wavelength
if wref is None:
if 'GRISMC' in pupil_mask:
pupil = 'GRISMC'
elif 'GRISM' in pupil_mask:
pupil = 'GRISMR'
else: # generic grism
pupil = 'GRISM'
wref = grism_wref(pupil, nrc.module)
# Create image
wspec, imspec = nrc.calc_psf_from_coeff(sp=sp, return_hdul=False, return_oversample=True)
# Place undeviated wavelength at (xpix,ypix) location
xr, yr = (np.array([xpix, ypix]) - 0.5) * oversample
yshift = yr - ny_over/2
# Empirically determine shift value in dispersion direction
wnew_temp = pad_or_cut_to_size(wspec, nx_over)
# Index of reference wavelength associated with ref pixel
imask = (wnew_temp>wref-0.01) & (wnew_temp<wref+0.01)
ix_ref = np.interp(wref, wnew_temp[imask], np.arange(nx_over)[imask])
xshift = xr - ix_ref
# Shift and crop to output size
imspec = pad_or_cut_to_size(imspec, (ny_over,nx_over), offset_vals=(yshift,xshift), fill_val=np.nan)
wspec = pad_or_cut_to_size(wspec, nx_over, offset_vals=xshift, fill_val=np.nan)
# Remove NaNs in image
ind_nan = np.isnan(imspec)
imspec[ind_nan] = np.min(imspec[~ind_nan])
# Fill NaNs in wavelength solution with linear extrapolation
ind_nan = np.isnan(wspec)
arr = np.arange(nx_over)
cf = jl_poly_fit(arr[~ind_nan], wspec[~ind_nan])
wspec[ind_nan] = jl_poly(arr[ind_nan], cf)
if return_oversample:
return wspec, imspec
else:
wspec = frebin(wspec, scale=1/oversample, total=False)
imspec = frebin(imspec, scale=1/oversample)
return wspec, imspec
[docs]def place_grismr_tso(waves, imarr, siaf_ap, wref=None, im_coords='sci'):
"""
Shift image such that undeviated wavelength sits at the
SIAF aperture reference location.
Return image in sience coords.
Parameters
==========
waves : ndarray
Wavelength solution of input image
imarr : ndarray
Input dispersed grism PSF (can be multiple PSFs)
siaf_ap : pysiaf aperture
Grism-specific SIAF aperture class to determine final
subarray size and reference point
"""
from .maths.coords import det_to_sci
if len(imarr.shape) > 2:
nz, ny_in, nx_in = imarr.shape
else:
nz = 1
ny_in, nx_in = imarr.shape
imarr = imarr.reshape([nz,ny_in,nx_in])
# Convert to sci coordinates
if im_coords=='det':
det_name = siaf_ap.AperName[3:5]
imarr = det_to_sci(imarr, det_name)
# Determine reference wavelength
if wref is None:
if 'GRISMC' in siaf_ap.AperName:
pupil = 'GRISMC'
elif 'GRISM' in siaf_ap.AperName:
pupil = 'GRISMR'
else: # generic grism
pupil = 'GRISM'
module = 'A' if 'NRCA' in siaf_ap.AperName else 'B'
wref = grism_wref(pupil, module)
# Get reference coordinates
yref, xref = (siaf_ap.YSciRef, siaf_ap.XSciRef)
# Final image size
ny_out, nx_out = (siaf_ap.YSciSize, siaf_ap.XSciSize)
# Empirically determine shift value in dispersion direction
wnew_temp = pad_or_cut_to_size(waves, nx_out)
# Index of reference wavelength associated with ref pixel
ind = (wnew_temp>wref-0.01) & (wnew_temp<wref+0.01)
xnew_temp = np.interp(wref, wnew_temp[ind], np.arange(nx_out)[ind])
xoff = xref - xnew_temp
# Move to correct position in y
yoff = yref - (int(ny_out/2) - 1)
# if np.mod(ny_in,2)==0: # If even, shift by half a pixel?
# yoff = yoff + 0.5
imarr = pad_or_cut_to_size(imarr, (ny_out,nx_out), offset_vals=(yoff,xoff), fill_val=np.nan)
waves = pad_or_cut_to_size(waves, nx_out, offset_vals=xoff, fill_val=np.nan)
# Remove NaNs
ind_nan = np.isnan(imarr)
imarr[ind_nan] = np.min(imarr[~ind_nan])
# Remove NaNs
# Fill in with wavelength solution (linear extrapolation)
ind_nan = np.isnan(waves)
# waves[ind_nan] = 0
arr = np.arange(nx_out)
cf = jl_poly_fit(arr[~ind_nan], waves[~ind_nan])
waves[ind_nan] = jl_poly(arr[ind_nan], cf)
return waves, imarr
###########################################################################
#
# Pick-off images for a given module
#
###########################################################################
[docs]def pickoff_xy(ap_obs_name):
"""
Return pickoff mirror FoV x/y limits in terms of science pixel coordinates
ap_obs : Aperture to create observation (e.g., 'NRCA5_FULL')
"""
ap_siaf = siaf_nrc[ap_obs_name]
module = ap_obs_name[3:4]
# Determine pick-off mirror FoV from
ap1 = siaf_nrc['NRC{}5_GRISMC_WFSS'.format(module)]
ap2 = siaf_nrc['NRC{}5_GRISMR_WFSS'.format(module)]
ap3 = siaf_nrc['NRCA5_FULL_MASK335R']
# V2/V3 coordinates of pick-off FoV
v2_1, v3_1 = ap1.corners('tel', False)
v2_2, v3_2 = ap2.corners('tel', False)
v2_3, v3_3 = ap3.corners('tel', False)
if module == 'B':
v2_3 *= -1
v2_all = np.array([v2_1, v2_2, v2_3]).flatten()
v3_all = np.array([v3_1, v3_2, v3_3]).flatten()
# Convert to science pixel positions
x_new, y_new = ap_siaf.tel_to_sci(v2_all, v3_all)
# sci pixel values are use are X.5
x1, x2 = np.array([x_new.min(), x_new.max()]).astype(np.int) + 0.5
y1, y2 = np.array([y_new.min(), y_new.max()]).astype(np.int) + 0.5
return (x1, x2, y1, y2)
[docs]def pickoff_image(ap_obs, v2_obj, v3_obj, flux_obj, oversample=1):
"""
Create an unconvolved image of filled pixel values that have
been shifted via bilinear interpolation. The image will then
be convolved with a PSF to create the a focal plane image that
is the size of the NIRCam pick-off mirror. This image should
then be cropped to generate the final detector image.
Returns the tuple (xsci, ysci, image), where xsci and ysci are
the science coordinates associated with the image.
Parameters
==========
ap_obs : str
Name of aperture in which the observation is taking place.
Necessary to determine pixel locations for stars.
v2_obj : ndarray
List of V2 coordiantes of stellar sources
v3_obj : ndarray
List of V3 coordinates of stellar sources
flux_obj : ndarray
List of fluxes (e-/sec) for each source
Keyword Args
============
oversample : int
If set, the returns an oversampled version of the image to
convolve with PSFs. If set to one, then detector pixels.
"""
from scipy.interpolate import interp2d
# xpix and ypix locations in science orientation
ap_siaf = siaf_nrc[ap_obs]
xpix, ypix = ap_siaf.tel_to_sci(v2_obj, v3_obj)
x1, x2, y1, y2 = pickoff_xy(ap_obs)
# Mask all sources that are outside pick-off mirror
mask = ((xpix>x1) & (xpix<x2-1)) & ((ypix>y1) & (ypix<y2-1))
xpix = xpix[mask]
ypix = ypix[mask]
src_flux = flux_obj[mask]
# Create oversized and oversampled image
ys = int((y2 - y1) * oversample)
xs = int((x2 - x1) * oversample)
oversized_image = np.zeros([ys,xs])
# X and Y detector pixel values
dstep = 1/oversample
xsci = np.arange(x1, x2, dstep)
ysci = np.arange(y1, y2, dstep)
# Zero-based (x,y) locations for oversized images
xvals_os = (xpix - x1) * oversample
yvals_os = (ypix - y1) * oversample
# separate into an integers and fractions
intx = xvals_os.astype(np.int)
inty = yvals_os.astype(np.int)
fracx = xvals_os - intx
fracy = yvals_os - inty
# flip negative shift values
ind = fracx < 0
fracx[ind] += 1
intx[ind] -= 1
ind = fracy<0
fracy[ind] += 1
inty[ind] -= 1
# Bilinear interpolation of all sources
val1 = src_flux * ((1-fracx)*(1-fracy))
val2 = src_flux * ((1-fracx)*fracy)
val3 = src_flux * ((1-fracy)*fracx)
val4 = src_flux * (fracx*fracy)
# Add star-by-star in case of overlapped indices
for i, (iy, ix) in enumerate(zip(inty,intx)):
oversized_image[iy, ix] += val1[i]
oversized_image[iy+1, ix] += val2[i]
oversized_image[iy, ix+1] += val3[i]
oversized_image[iy+1, ix+1] += val4[i]
#print("NStars: {}".format(len(intx)))
return xsci, ysci, oversized_image
[docs]def gen_unconvolved_point_source_image(nrc, tel_pointing, ra_deg, dec_deg, mags,
expnum=1, osamp=1, siaf_ap_obs=None, **kwargs):
""" Create an unconvolved image with sub-pixel shifts
"""
from .obs_nircam import attenuate_with_coron_mask, gen_coron_mask
# Observation aperture
siaf_ap_obs = nrc.siaf_ap if siaf_ap_obs is None else siaf_ap_obs
ap_obs_name = siaf_ap_obs.AperName
# Get all source fluxes
# mags = tbl[nrc.filter].data
flux_obj = mag_to_counts(mags, nrc.bandpass, **kwargs)
if isinstance(expnum, str):
expnum = int(expnum)
ind = np.where(tel_pointing.exp_nums == expnum)[0][0]
# Convert RA, Dec coordiantes into V2/V3 (arcsec)
# ra_deg, dec_deg = (tbl['ra'], tbl['dec'])
idl_off = [tel_pointing.position_offsets_act[ind]]
v2_obj, v3_obj = tel_pointing.radec_to_frame((ra_deg, dec_deg), frame_out='tel', idl_offsets=idl_off)
# Create initial POM image, then contract to reasonable size
xsci, ysci, im_pom = pickoff_image(ap_obs_name, v2_obj, v3_obj, flux_obj, oversample=osamp)
# Crop based on subarray window size
# Maximum required size depends on PSF and detector readout size
# Min and max sci coordinates to keep
xmin = ymin = int(-nrc.fov_pix/2 - 1)
xmax = int(siaf_ap_obs.XSciSize + nrc.fov_pix/2 + 1)
ymax = int(siaf_ap_obs.YSciSize + nrc.fov_pix/2 + 1)
xmask = (xsci>=xmin) & (xsci<xmax)
ymask = (ysci>=ymin) & (ysci<ymax)
# Keep only regions that contribute to final convolved image
xsci = xsci[xmask]
ysci = ysci[ymask]
im_sci = im_pom[ymask][:,xmask]
# Attenuate image by coronagraphic mask features (ND squaures and COM holder)
if nrc.is_coron and (not nrc.ND_acq):
try:
cmask = nrc.mask_images['OVERSAMP']
except:
mask_dict = gen_coron_mask(nrc)
cmask = mask_dict['OVERSAMP']
if cmask is not None:
# Make mask image same on-sky size as im_sci
ny_over, nx_over = np.array(im_sci.shape) * nrc.oversample / osamp
x0, y0 = (np.min(xsci), np.min(ysci))
x1 = int(np.abs(x0*nrc.oversample))
y1 = int(np.abs(y0*nrc.oversample))
x2, y2 = (x1 + cmask.shape[1], y1 + cmask.shape[0])
pad_vals = ((y1,int(ny_over-y2)), (x1,int(nx_over-x2)))
cmask_oversized = np.pad(cmask, pad_vals, mode='edge')
# Rebin to im_sci sampling
cmask_oversized = frebin(cmask_oversized, dimensions=im_sci.shape, total=False)
# Perform attenuation
im_sci = attenuate_with_coron_mask(nrc, im_sci, cmask_oversized)
# Make science image HDUList from
hdul_sci_image = fits.HDUList([fits.PrimaryHDU(im_sci)])
hdul_sci_image[0].header['PIXELSCL'] = nrc.pixelscale / osamp
hdul_sci_image[0].header['OSAMP'] = osamp
hdul_sci_image[0].header['INSTRUME'] = nrc.name
hdul_sci_image[0].header['APERNAME'] = ap_obs_name
# Get X and Y indices corresponding to aperture reference
xind_ref = np.argmin(np.abs(xsci - siaf_ap_obs.XSciRef))
yind_ref = np.argmin(np.abs(ysci - siaf_ap_obs.YSciRef))
hdul_sci_image[0].header['XIND_REF'] = (xind_ref, "x index of aperture reference")
hdul_sci_image[0].header['YIND_REF'] = (yind_ref, "y index of aperture reference")
hdul_sci_image[0].header['XSCI0'] = (np.min(xsci), "xsci value at (x,y)=(0,0) corner")
hdul_sci_image[0].header['YSCI0'] = (np.min(ysci), "ysci value at (x,y)=(0,0) corner")
hdul_sci_image[0].header['CFRAME'] = 'sci'
# print(im_pom.shape, im_sci.shape)
return hdul_sci_image
###########################################################################
#
# Coronagraphic Disk Imaging Routines
#
###########################################################################
[docs]def nproc_use_convolve(fov_pix, oversample, npsf=None):
"""
Attempt to estimate a reasonable number of processes to use for multiple
simultaneous convolve_fft calculations.
Here we attempt to estimate how many such calculations can happen in
parallel without swapping to disk, with a mixture of empiricism and conservatism.
One really does not want to end up swapping to disk with huge arrays.
NOTE: Requires psutil package. Otherwise defaults to mp.cpu_count() / 2
Parameters
-----------
fov_pix : int
Square size in detector-sampled pixels of final PSF image.
oversample : int
The optical system that we will be calculating for.
npsf : int
Number of PSFs. Sets maximum # of processes.
"""
try:
import psutil
except ImportError:
nproc = int(mp.cpu_count() // 2)
if nproc < 1: nproc = 1
_log.info("No psutil package available, cannot estimate optimal nprocesses.")
_log.info("Returning nproc=ncpu/2={}.".format(nproc))
return nproc
mem = psutil.virtual_memory()
avail_GB = mem.available / (1024**3) - 1.0 # Leave 1 GB
fov_pix_over = fov_pix * oversample
# Memory formulas are based on fits to memory usage stats for:
# fov_arr = np.array([16,32,128,160,256,320,512,640,1024,2048])
# os_arr = np.array([1,2,4,8])
# In MBytes
mem_total = 300*(fov_pix_over)**2 * 8 / (1024**2)
# Convert to GB
mem_total /= 1024
# How many processors to split into?
nproc = avail_GB // mem_total
nproc = np.min([nproc, mp.cpu_count(), poppy.conf.n_processes])
if npsf is not None:
nproc = np.min([nproc, npsf])
# Resource optimization:
# Split iterations evenly over processors to free up minimally used processors.
# For example, if there are 5 processes only doing 1 iteration, but a single
# processor doing 2 iterations, those 5 processors (and their memory) will not
# get freed until the final processor is finished. So, to minimize the number
# of idle resources, take the total iterations and divide by two (round up),
# and that should be the final number of processors to use.
np_max = np.ceil(npsf / nproc)
nproc = int(np.ceil(npsf / np_max))
if nproc < 1: nproc = 1
return int(nproc)
###########################################################################
#
# Coronagraphic Mask Transmission
#
###########################################################################
[docs]def offset_bar(filt, mask):
"""Bar mask offset locations
Get the appropriate offset in the x-position to place a source on a bar mask.
Each bar is 20" long with edges and centers corresponding to::
SWB: [1.03, 2.10, 3.10] (um) => [-10, 0, +10] (asec)
LWB: [2.30, 4.60, 6.90] (um) => [+10, 0, -10] (asec)
"""
if (mask is not None) and ('WB' in mask):
# What is the effective wavelength of the filter?
#bp = pynrc.read_filter(filter)
#w0 = bp.avgwave() / 1e4
w0 = np.float(filt[1:-1])/100
# Choose wavelength from dictionary
wdict = {'F182M': 1.84, 'F187N': 1.88, 'F210M': 2.09, 'F212N': 2.12,
'F250M': 2.50, 'F300M': 2.99, 'F335M': 3.35, 'F360M': 3.62,
'F410M': 4.09, 'F430M': 4.28, 'F460M': 4.63, 'F480M': 4.79,
'F200W': 2.229, 'F277W': 3.144, 'F356W': 3.971, 'F444W': 4.992}
w = wdict.get(filt, w0)
# Get appropriate x-offset
#xoff_asec = np.interp(w,wpos,xpos)
if 'SWB' in mask:
if filt[-1]=="W": xoff_asec = 6.83 * (w - 2.196)
else: xoff_asec = 7.14 * (w - 2.100)
elif 'LWB' in mask:
if filt[-1]=="W": xoff_asec = -3.16 * (w - 4.747)
else: xoff_asec = -3.26 * (w - 4.600)
#print(w, xoff_asec)
yoff_asec = 0.0
r, theta = xy_to_rtheta(xoff_asec, yoff_asec)
else:
r, theta = (0.0, 0.0)
# Want th_bar to be -90 so that r matches webbpsf
if theta>0:
r = -1 * r
theta = -1 * theta
#print(r, theta)
return r, theta
[docs]def coron_trans(name, module='A', pixelscale=None, npix=None, oversample=1,
nd_squares=True, shift_x=None, shift_y=None, filter=None):
"""
Build a transmission image of a coronagraphic mask spanning
the 20" coronagraphic FoV.
oversample is used only if pixelscale is set to None.
Returns the intensity transmission (square of the amplitude transmission).
"""
from webbpsf.optics import NIRCam_BandLimitedCoron
shifts = {'shift_x': shift_x, 'shift_y': shift_y}
bar_offset = None
if name=='MASK210R':
pixscale = pixscale_SW
channel = 'short'
filter = 'F210M' if filter is None else filter
elif name=='MASK335R':
pixscale = pixscale_LW
channel = 'long'
filter = 'F335M' if filter is None else filter
elif name=='MASK430R':
pixscale = pixscale_LW
channel = 'long'
filter = 'F430M' if filter is None else filter
elif name=='MASKSWB':
pixscale = pixscale_SW
channel = 'short'
filter = 'F210M' if filter is None else filter
bar_offset = 0
elif name=='MASKLWB':
pixscale = pixscale_LW
channel = 'long'
filter = 'F430M' if filter is None else filter
bar_offset = 0
if pixelscale is None:
pixelscale = pixscale / oversample
if npix is None:
npix = 320 if channel=='long' else 640
npix = int(npix * oversample + 0.5)
elif npix is None:
# default to 20" if pixelscale is set but no npix
npix = int(20 / pixelscale + 0.5)
mask = NIRCam_BandLimitedCoron(name=name, module=module, bar_offset=bar_offset, auto_offset=None,
nd_squares=nd_squares, **shifts)
# Create wavefront to pass through mask and obtain transmission image
bandpass = read_filter(filter)
wavelength = bandpass.avgwave() / 1e10
wave = poppy.Wavefront(wavelength=wavelength, npix=npix, pixelscale=pixelscale)
# Square the amplitude transmission to get intensity transmission
im = mask.get_transmission(wave)**2
return im
[docs]def build_mask(module='A', pixscale=None, filter=None, nd_squares=True):
"""Create coronagraphic mask image
Return a truncated image of the full coronagraphic mask layout
for a given module. Assumes each mask is exactly 20" across.
+V3 is up, and +V2 is to the left.
"""
if module=='A':
names = ['MASK210R', 'MASK335R', 'MASK430R', 'MASKSWB', 'MASKLWB']
elif module=='B':
names = ['MASKSWB', 'MASKLWB', 'MASK430R', 'MASK335R', 'MASK210R']
if pixscale is None:
pixscale=pixscale_LW
npix = int(20 / pixscale + 0.5)
allims = []
for name in names:
res = coron_trans(name, module=module, pixelscale=pixscale, npix=npix, nd_squares=nd_squares)
allims.append(res)
im_out = np.concatenate(allims, axis=1)
# Multiply COM throughputs sampled at filter wavelength
if filter is not None:
bandpass = read_filter(filter)
w_um = bandpass.avgwave() / 1e4
com_th = nircam_com_th(wave_out=w_um)
com_nd = 10**(-1*nircam_com_nd(wave_out=w_um))
ind_nd = (im_out<0.0011) & (im_out>0.0009)
im_out[ind_nd] = com_nd
im_out *= com_th
return im_out
[docs]def build_mask_detid(detid, oversample=1, ref_mask=None, pupil=None, filter=None,
nd_squares=True, mask_holder=True):
"""Create mask image for a given detector
Return a full coronagraphic mask image as seen by a given SCA.
+V3 is up, and +V2 is to the left.
Parameters
----------
detid : str
Name of detector, 'A1', A2', ... 'A5' (or 'ALONG'), etc.
oversample : float
How much to oversample output mask relative to detector sampling.
ref_mask : str or None
Reference mask for placement of coronagraphic mask elements.
If None, then defaults are chosen for each detector.
pupil : str or None
Which Lyot pupil stop is being used? This affects holder placement.
If None, then defaults based on ref_mask.
"""
names = ['A1', 'A2', 'A3', 'A4', 'A5',
'B1', 'B2', 'B3', 'B4', 'B5']
# In case input is 'NRC??'
if 'NRC' in detid:
detid = detid[3:]
# Convert ALONG to A5 name
module = detid[0]
detid = '{}5'.format(module) if 'LONG' in detid else detid
# Make sure we have a valid name
if detid not in names:
raise ValueError("Invalid detid: {0} \n Valid names are: {1}" \
.format(detid, ', '.join(names)))
pixscale = pixscale_LW if '5' in detid else pixscale_SW
pixscale_over = pixscale / oversample
# Build the full mask
xpix = ypix = 2048
xpix_over = int(xpix * oversample)
ypix_over = int(ypix * oversample)
cmask = np.ones([ypix_over, xpix_over], dtype='float64')
# These detectors don't see any of the mask structure
if detid in ['A1', 'A3', 'B2', 'B4']:
return cmask
if detid=='A2':
cnames = ['MASK210R', 'MASK335R', 'MASK430R']
ref_mask = 'MASK210R' if ref_mask is None else ref_mask
elif detid=='A4':
cnames = ['MASK430R', 'MASKSWB', 'MASKLWB']
ref_mask = 'MASKSWB' if ref_mask is None else ref_mask
elif detid=='A5':
cnames = ['MASK210R', 'MASK335R', 'MASK430R', 'MASKSWB', 'MASKLWB']
ref_mask = 'MASK430R' if ref_mask is None else ref_mask
elif detid=='B1':
cnames = ['MASK430R', 'MASK335R', 'MASK210R']
ref_mask = 'MASK210R' if ref_mask is None else ref_mask
elif detid=='B3':
cnames = ['MASKSWB', 'MASKLWB', 'MASK430R']
ref_mask = 'MASKSWB' if ref_mask is None else ref_mask
elif detid=='B5':
cnames = ['MASKSWB', 'MASKLWB', 'MASK430R', 'MASK335R', 'MASK210R']
ref_mask = 'MASK430R' if ref_mask is None else ref_mask
# Generate sub-images for each aperture
# npix = int(ypix / len(cnames))
npix = int(20.5 / pixscale_over + 0.5)
npix_large = int(26 / pixscale_over + 0.5)
allims = []
for cname in cnames:
res = coron_trans(cname, module=module, pixelscale=pixscale_over, npix=npix_large,
filter=filter, nd_squares=nd_squares)
allims.append(res)
if pupil is None:
pupil = 'WEDGELYOT' if ('WB' in ref_mask) else 'CIRCLYOT'
# For each sub-image, expand and move to correct location
channel = 'LW' if '5' in detid else 'SW'
for i, name in enumerate(cnames):
cdict = coron_ap_locs(module, channel, name, pupil=pupil, full=False)
# Crop off large size
im_crop = pad_or_cut_to_size(allims[i], (npix, npix_large))
# Expand and offset
xsci, ysci = cdict['cen_sci']
xoff = xsci*oversample - ypix_over/2
yoff = ysci*oversample - xpix_over/2
im_expand = pad_or_cut_to_size(im_crop+1000, (ypix_over, xpix_over), offset_vals=(yoff,xoff))
ind_good = ((cmask<100) & (im_expand>100)) | ((cmask==1001) & (im_expand>100))
cmask[ind_good] = im_expand[ind_good]
# Remove offsets
cmask[cmask>100] = cmask[cmask>100] - 1000
# Multiply COM throughputs sampled at filter wavelength
if filter is not None:
bandpass = read_filter(filter)
w_um = bandpass.avgwave() / 1e4
com_th = nircam_com_th(wave_out=w_um)
com_nd = 10**(-1*nircam_com_nd(wave_out=w_um))
ind_nd = (cmask<0.0011) & (cmask>0.0009)
cmask[ind_nd] = com_nd
cmask *= com_th
# Place cmask in detector coords
cmask = sci_to_det(cmask, detid)
############################################
# Place blocked region from coronagraph holder
# Also ensure region outside of COM has throughput=1
if mask_holder:
if detid=='A2':
if 'CIRCLYOT' in pupil:
i1, i2 = [int(920*oversample), int(390*oversample)]
cmask[0:i1,0:i2] = 0
cmask[i1:,0:i2] = 1
i1 = int(220*oversample)
cmask[0:i1,:] = 0
i2 = int(974*oversample)
cmask[i2:,:] = 1
else:
i1, i2 = [int(935*oversample), int(393*oversample)]
cmask[0:i1,0:i2] = 0
cmask[i1:, 0:i2] = 1
i1 = int(235*oversample)
cmask[0:i1,:] = 0
i2 = int(985*oversample)
cmask[i2:,:] = 1
elif detid=='A4':
if 'CIRCLYOT' in pupil:
i1, i2 = [int(920*oversample), int(1463*oversample)]
cmask[0:i1,i2:] = 0
cmask[i1:, i2:] = 1
i1 = int(220*oversample)
cmask[0:i1,:] = 0
i2 = int(974*oversample)
cmask[i2:,:] = 1
else:
i1, i2 = [int(935*oversample), int(1465*oversample)]
cmask[0:i1,i2:] = 0
cmask[i1:, i2:] = 1
i1 = int(235*oversample)
cmask[0:i1,:] = 0
i2 = int(985*oversample)
cmask[i2:,:] = 1
elif detid=='A5':
if 'CIRCLYOT' in pupil:
i1, i2 = [int(1480*oversample), int(270*oversample)]
cmask[i1:,0:i2] = 0
cmask[0:i1,0:i2] = 1
i1, i2 = [int(1480*oversample), int(1880*oversample)]
cmask[i1:,i2:] = 0
cmask[0:i1,i2:] = 1
i1 = int(1825*oversample)
cmask[i1:,:] = 0
i2 = int(1452*oversample)
cmask[0:i2,:] = 1
else:
i1, i2 = [int(1485*oversample), int(275*oversample)]
cmask[i1:,0:i2] = 0
cmask[0:i1,0:i2] = 1
i1, i2 = [int(1485*oversample), int(1883*oversample)]
cmask[i1:,i2:] = 0
cmask[0:i1,i2:] = 1
i1 = int(1830*oversample)
cmask[i1:,:] = 0
i2 = int(1462*oversample)
cmask[0:i2,:] = 1
elif detid=='B1':
if 'CIRCLYOT' in pupil:
i1, i2 = [int(910*oversample), int(1615*oversample)]
cmask[0:i1,i2:] = 0
cmask[i1:,i2:] = 1
i1 = int(210*oversample)
cmask[0:i1,:] = 0
i2 = int(956*oversample)
cmask[i2:,:] = 1
else:
i1, i2 = [int(905*oversample), int(1609*oversample)]
cmask[0:i1,i2:] = 0
cmask[i1:,i2:] = 1
i1 = int(205*oversample)
cmask[0:i1,:] = 0
i2 = int(951*oversample)
cmask[i2:,:] = 1
elif detid=='B3':
if 'CIRCLYOT' in pupil:
i1, i2 = [int(920*oversample), int(551*oversample)]
cmask[0:i1,0:i2] = 0
cmask[i1:,0:i2] = 1
i1 = int(210*oversample)
cmask[0:i1,:] = 0
i2 = int(966*oversample)
cmask[i2:,:] = 1
else:
i1, i2 = [int(920*oversample), int(548*oversample)]
cmask[0:i1,0:i2] = 0
cmask[i1:,0:i2] = 1
i1 = int(210*oversample)
cmask[0:i1,:] = 0
i2 = int(963*oversample)
cmask[i2:,:] = 1
elif detid=='B5':
if 'CIRCLYOT' in pupil:
i1, i2 = [int(555*oversample), int(207*oversample)]
cmask[0:i1,0:i2] = 0
cmask[i1:, 0:i2] = 1
i1, i2 = [int(545*oversample), int(1815*oversample)]
cmask[0:i1,i2:] = 0
cmask[i1:, i2:] = 1
i1 = int(215*oversample)
cmask[0:i1,:] = 0
i2 = int(578*oversample)
cmask[i2:,:] = 1
else:
i1, i2 = [int(555*oversample), int(211*oversample)]
cmask[0:i1,0:i2] = 0
cmask[i1:, 0:i2] = 1
i1, i2 = [int(545*oversample), int(1819*oversample)]
cmask[0:i1,i2:] = 0
cmask[i1:, i2:] = 1
i1 = int(215*oversample)
cmask[0:i1,:] = 0
i2 = int(578*oversample)
cmask[i2:,:] = 1
############################################
# Fix SW/LW wedge abuttment
if detid=='A4':
if 'CIRCLYOT' in pupil:
x0 = 819
x1 = 809
x2 = x1 + 10
else:
x0 = 821
x1 = 812
x2 = x1 + 9
y1, y2 = (400, 650)
ix0 = int(x0*oversample)
iy1, iy2 = int(y1*oversample), int(y2*oversample)
ix1, ix2 = int(x1*oversample), int(x2*oversample)
cmask[iy1:iy2,ix1:ix2] = cmask[iy1:iy2,ix0].reshape([-1,1])
elif detid=='A5':
if 'CIRCLYOT' in pupil:
x0 = 587
x1 = x0 + 1
x2 = x1 + 5
else:
x0 = 592
x1 = x0 + 1
x2 = x1 + 5
y1, y2 = (1600, 1750)
ix0 = int(x0*oversample)
iy1, iy2 = int(y1*oversample), int(y2*oversample)
ix1, ix2 = int(x1*oversample), int(x2*oversample)
cmask[iy1:iy2,ix1:ix2] = cmask[iy1:iy2,ix0].reshape([-1,1])
elif detid=='B3':
if 'CIRCLYOT' in pupil:
x0 = 1210
x1 = 1196
x2 = x1 + 14
else:
x0 = 1204
x1 = 1192
x2 = x1 + 12
y1, y2 = (350, 650)
ix0 = int(x0*oversample)
iy1, iy2 = int(y1*oversample), int(y2*oversample)
ix1, ix2 = int(x1*oversample), int(x2*oversample)
cmask[iy1:iy2,ix1:ix2] = cmask[iy1:iy2,ix0].reshape([-1,1])
elif detid=='B5':
if 'CIRCLYOT' in pupil:
x0 = 531
x1 = 525
x2 = x1 + 6
else:
x0 = 535
x1 = 529
x2 = x1 + 6
y1, y2 = (300, 420)
ix0 = int(x0*oversample)
iy1, iy2 = int(y1*oversample), int(y2*oversample)
ix1, ix2 = int(x1*oversample), int(x2*oversample)
cmask[iy1:iy2,ix1:ix2] = cmask[iy1:iy2,ix0].reshape([-1,1])
# Convert back to 'sci' orientation
cmask = det_to_sci(cmask, detid)
return cmask
[docs]def coron_ap_locs(module, channel, mask, pupil=None, full=False):
"""Coronagraph mask aperture locations and sizes
Returns a dictionary of the detector aperture sizes
and locations. Attributes 'cen' and 'loc' are in terms
of (x,y) detector pixels. 'cen_sci' is sci coords location.
"""
if channel=='long':
channel = 'LW'
elif channel=='short':
channel = 'SW'
if pupil is None:
pupil = 'WEDGELYOT' if 'WB' in mask else 'CIRCLYOT'
if module=='A':
if channel=='SW':
if '210R' in mask:
cdict_rnd = {'det':'A2', 'cen':(712,525), 'size':640}
cdict_bar = {'det':'A2', 'cen':(716,536), 'size':640}
elif '335R' in mask:
cdict_rnd = {'det':'A2', 'cen':(1368,525), 'size':640}
cdict_bar = {'det':'A2', 'cen':(1372,536), 'size':640}
elif '430R' in mask:
cdict_rnd = {'det':'A2', 'cen':(2025,525), 'size':640}
cdict_bar = {'det':'A2', 'cen':(2029,536), 'size':640}
elif 'SWB' in mask:
cdict_rnd = {'det':'A4', 'cen':(487,523), 'size':640}
cdict_bar = {'det':'A4', 'cen':(490,536), 'size':640}
elif 'LWB' in mask:
cdict_rnd = {'det':'A4', 'cen':(1141,523), 'size':640}
cdict_bar = {'det':'A4', 'cen':(1143,536), 'size':640}
else:
raise ValueError('Mask {} not recognized for {} channel'\
.format(mask, channel))
elif channel=='LW':
if '210R' in mask:
cdict_rnd = {'det':'A5', 'cen':(1720, 1672), 'size':320}
cdict_bar = {'det':'A5', 'cen':(1725, 1682), 'size':320}
elif '335R' in mask:
cdict_rnd = {'det':'A5', 'cen':(1397,1672), 'size':320}
cdict_bar = {'det':'A5', 'cen':(1402,1682), 'size':320}
elif '430R' in mask:
cdict_rnd = {'det':'A5', 'cen':(1074,1672), 'size':320}
cdict_bar = {'det':'A5', 'cen':(1078,1682), 'size':320}
elif 'SWB' in mask:
cdict_rnd = {'det':'A5', 'cen':(752,1672), 'size':320}
cdict_bar = {'det':'A5', 'cen':(757,1682), 'size':320}
elif 'LWB' in mask:
cdict_rnd = {'det':'A5', 'cen':(430,1672), 'size':320}
cdict_bar = {'det':'A5', 'cen':(435,1682), 'size':320}
else:
raise ValueError('Mask {} not recognized for {} channel'\
.format(mask, channel))
else:
raise ValueError('Channel {} not recognized'.format(channel))
elif module=='B':
if channel=='SW':
if '210R' in mask:
cdict_rnd = {'det':'B1', 'cen':(1293,513), 'size':640}
cdict_bar = {'det':'B1', 'cen':(1287,508), 'size':640}
elif '335R' in mask:
cdict_rnd = {'det':'B1', 'cen':(637,513), 'size':640}
cdict_bar = {'det':'B1', 'cen':(632,508), 'size':640}
elif '430R' in mask:
cdict_rnd = {'det':'B1', 'cen':(-20,513), 'size':640}
cdict_bar = {'det':'B1', 'cen':(-25,508), 'size':640}
elif 'SWB' in mask:
cdict_rnd = {'det':'B3', 'cen':(874,519), 'size':640}
cdict_bar = {'det':'B3', 'cen':(870,516), 'size':640}
elif 'LWB' in mask:
cdict_rnd = {'det':'B3', 'cen':(1532,519), 'size':640}
cdict_bar = {'det':'B3', 'cen':(1526,516), 'size':640}
else:
raise ValueError('Mask {} not recognized for {} channel'\
.format(mask, channel))
elif channel=='LW':
if '210R' in mask:
cdict_rnd = {'det':'B5', 'cen':(1656,360), 'size':320}
cdict_bar = {'det':'B5', 'cen':(1660,360), 'size':320}
elif '335R' in mask:
cdict_rnd = {'det':'B5', 'cen':(1334,360), 'size':320}
cdict_bar = {'det':'B5', 'cen':(1338,360), 'size':320}
elif '430R' in mask:
cdict_rnd = {'det':'B5', 'cen':(1012,360), 'size':320}
cdict_bar = {'det':'B5', 'cen':(1015,360), 'size':320}
elif 'SWB' in mask:
cdict_rnd = {'det':'B5', 'cen':(366,360), 'size':320}
cdict_bar = {'det':'B5', 'cen':(370,360), 'size':320}
elif 'LWB' in mask:
cdict_rnd = {'det':'B5', 'cen':(689,360), 'size':320}
cdict_bar = {'det':'B5', 'cen':(693,360), 'size':320}
else:
raise ValueError('Mask {} not recognized for {} channel'\
.format(mask, channel))
else:
raise ValueError('Channel {} not recognized'.format(channel))
else:
raise ValueError('Module {} not recognized'.format(module))
# Choose whether to use round or bar Lyot mask
cdict = cdict_rnd if 'CIRC' in pupil else cdict_bar
x0, y0 = np.array(cdict['cen']) - cdict['size']/2
cdict['loc'] = (int(x0), int(y0))
# Add in 'sci' coordinates (V2/V3 orientation)
# X is flipped for A5, Y is flipped for all others
cen = cdict['cen']
if cdict['det'] == 'A5':
cdict['cen_sci'] = (2048-cen[0], cen[1])
else:
cdict['cen_sci'] = (cen[0], 2048-cen[1])
if full:
cdict['size'] = 2048
cdict['loc'] = (0,0)
return cdict
[docs]def coron_detector(mask, module, channel=None):
"""
Return detector name for a given coronagraphic mask, module,
and channel.
"""
# Grab default channel
if channel is None:
if ('210R' in mask) or ('SW' in mask):
channel = 'SW'
else:
channel = 'LW'
# If LW, always A5 or B5
# If SW, bar masks are A4/B3, round masks A2/B1; M430R is invalid
if channel=='LW':
detname = module + '5'
elif (channel=='SW') and ('430R' in mask):
raise AttributeError("MASK430R not valid for SW channel")
else:
if module=='A':
detname = 'A2' if mask[-1]=='R' else 'A4'
else:
detname = 'B1' if mask[-1]=='R' else 'B3'
return detname