import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import os, re
from astropy.io import fits, ascii
from astropy.table import Table
import astropy.units as u
from scipy.interpolate import griddata, RegularGridInterpolator, interp1d
from . import conf
from .utils import S
from .bandpasses import miri_filter, nircam_filter
from .maths import jl_poly, jl_poly_fit, binned_statistic
from .robust import medabsdev
import logging
_log = logging.getLogger('webbpsf_ext')
from . import __path__
_spec_dir = __path__[0] + '/spectral_data/'
[docs]def BOSZ_filename(Teff, metallicity, log_g, res, carbon=0, alpha=0):
""" Generate filename for BOSZ spectrum. """
teff_str = f't{Teff:04.0f}'
logg_str = 'g{:02.0f}'.format(int(log_g*10))
metal_str = 'mp{:02.0f}'.format(int(abs(metallicity*10)+0.5))
# Metallicity [M/H]
if metallicity<0:
metal_str = metal_str.replace('p', 'm')
# Carbon abundance [C/M]
carb_str = 'cp{:02.0f}'.format(int(abs(carbon*10)+0.5))
if carbon<0:
carb_str = carb_str.replace('p', 'm')
# alpha-element value [alpha/H]
alpha_str = 'op{:02.0f}'.format(int(abs(alpha*10)+0.5))
if alpha<0:
alpha_str = alpha_str.replace('p', 'm')
# Resolution
rstr = 'b{}'.format(res)
# Final file name
fname = f'a{metal_str}{carb_str}{alpha_str}{teff_str}{logg_str}v20modrt0{rstr}rs.fits'
return fname
[docs]def download_BOSZ_spectrum(Teff, metallicity, log_g, res, carbon=0, alpha=0):
import requests
res_dir = os.path.join(_spec_dir, 'bosz_grids', 'R{}'.format(res))
# Create resolution directory if it doesn't exists
if not os.path.isdir(res_dir):
os.makedirs(res_dir)
# Generate URL directory that file is saved in
url_base = 'http://archive.stsci.edu/missions/hlsp/bosz/fits/'
res_str = f'insbroad_{res:06.0f}'
metal_str = f'metal_{metallicity:+.2f}'
carbon_str = f'carbon_{carbon:+.2f}'
alpha_str = f'alpha_{alpha:+.2f}'
url_dir = os.path.join(url_base,res_str,metal_str,carbon_str,alpha_str)
# Generate file name
fname = BOSZ_filename(Teff, metallicity, log_g, res, carbon=carbon, alpha=alpha)
# Final URL
url_final = os.path.join(url_dir, fname)
# Make request
_log.info(f'Downloading file: {fname}')
req = requests.get(url_final, allow_redirects=True)
# Raise exception if file not found or other HTTP error
if req.status_code != requests.codes.ok:
req.raise_for_status()
# Save file to directory
outpath = os.path.join(res_dir, fname)
_log.info(f'Saving file to: {outpath}')
open(outpath, 'wb').write(req.content)
[docs]def BOSZ_spectrum(Teff, metallicity, log_g, res=2000, interpolate=True,
carbon=0, alpha=0, **kwargs):
"""BOSZ stellar atmospheres (Bohlin et al 2017).
Read in a spectrum from the BOSZ stellar atmosphere models database.
Returns a Pysynphot spectral object. Wavelength values range between
1000 Angstroms to 32 microns. Teff range from 3500K to 36000K.
This function interpolates the model grid by reading in those models
closest in temperature, metallicity, and log g to the desired parameters,
then takes the weighted average of these models based on their relative
offsets. Can also just read in the closest model by setting interpolate=False.
Different spectral resolutions can also be specified.
Parameters
----------
Teff : float
Effective temperature ranging from 3500K to 30000K.
metallicity : float
Metallicity [Fe/H] value ranging from -2.5 to 0.5.
log_g : float
Surface gravity (log g) from 0 to 5.
Keyword Args
------------
carbon : float
Carbon abundance [C/M]. Must be either [-0.75,-0.5,-0.25, 0, 0.25, 0.5].
alpha : float
alpha-element value [alpha/H]. Must be either [-0.25, 0, 0.25, 0.5]
res : str
Spectral resolution to use (instrument broadening). Valid points:
[200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 300000]
interpolate : bool
Interpolate spectrum using a weighted average of grid points
surrounding the desired input parameters.
References
----------
https://archive.stsci.edu/prepds/bosz/
"""
model_dir = _spec_dir + 'bosz_grids/'
res_dir = model_dir + 'R{}/'.format(res)
if not os.path.isdir(model_dir):
raise IOError('BOSZ model directory does not exist: {}'.format(model_dir))
if not os.path.isdir(res_dir):
os.makedirs(res_dir)
# raise IOError('Resolution directory does not exist: {}'.format(res_dir))
# Grid of computed temperature steps
teff_grid = list(range(3500,12000,250)) \
+ list(range(12000,20000,500)) \
+ list(range(20000,36000,1000))
teff_grid = np.array(teff_grid)
# Grid of log g steps for desired Teff
lg_max = 5
lg_step = 0.5
if Teff <= 6000: lg_min = 0
elif Teff <= 8000: lg_min = 1
elif Teff <= 12000: lg_min = 2
elif Teff <= 30000: lg_min = 3
elif Teff <= 35000: lg_min = 3.5
else: raise ValueError('Teff must be less than or equal to 35000K.')
if log_g<lg_min:
raise ValueError('log_g must be >={}'.format(lg_min))
if log_g>lg_max:
raise ValueError('log_g must be <={}'.format(lg_max))
# Grid of log g values
logg_grid = np.arange(lg_min, lg_max+lg_step, lg_step)
# Grid of metallicity values
metal_grid = np.arange(-2.5,0.75,0.25)
# First, choose the two grid points closest in Teff
teff_diff = np.abs(teff_grid - Teff)
ind_sort = np.argsort(teff_diff)
if teff_diff[ind_sort[0]]==0: # Exact
teff_best = np.array([teff_grid[ind_sort[0]]])
else: # Want to interpolate
teff_best = teff_grid[ind_sort[0:2]]
# Choose the two best log g values
logg_diff = np.abs(logg_grid - log_g)
ind_sort = np.argsort(logg_diff)
if logg_diff[ind_sort[0]]==0: # Exact
logg_best = np.array([logg_grid[ind_sort[0]]])
else: # Want to interpolate
logg_best = logg_grid[ind_sort[0:2]]
# Choose the two best metallicity values
metal_diff = np.abs(metal_grid - metallicity)
ind_sort = np.argsort(metal_diff)
if metal_diff[ind_sort[0]]==0: # Exact
metal_best = np.array([metal_grid[ind_sort[0]]])
else: # Want to interpolate
metal_best = metal_grid[ind_sort[0:2]]
# Build final file names
fnames = []
# Build lists of properties to pass to download function if needed
teff_all = []
logg_all = []
metal_all = []
for t in teff_best:
for l in logg_best:
for m in metal_best:
fname = BOSZ_filename(t, m, l, res, carbon=carbon, alpha=alpha)
fnames.append(fname)
teff_all.append(t)
logg_all.append(l)
metal_all.append(m)
teff_all = np.array(teff_all)
logg_all = np.array(logg_all)
metal_all = np.array(metal_all)
# Weight by relative distance from desired value
weights = []
teff_diff = np.abs(teff_best - Teff)
logg_diff = np.abs(logg_best - log_g)
metal_diff = np.abs(metal_best - metallicity)
for t in teff_diff:
wt = 1 if len(teff_diff)==1 else t / np.sum(teff_diff)
for l in logg_diff:
wl = 1 if len(logg_diff)==1 else l / np.sum(logg_diff)
for m in metal_diff:
wm = 1 if len(metal_diff)==1 else m / np.sum(metal_diff)
weights.append(wt*wl*wm)
weights = np.array(weights)
weights = weights / np.sum(weights)
if interpolate:
wave_all = []
flux_all = []
for i, f in enumerate(fnames):
# Download files that don't currently exist
if not os.path.isfile(res_dir+f):
download_BOSZ_spectrum(teff_all[i], metal_all[i], logg_all[i], res,
carbon=carbon, alpha=alpha)
d = fits.getdata(res_dir+f, 1)
wave_all.append(d['Wavelength'])
flux_all.append(d['SpecificIntensity'] * weights[i])
wfin = wave_all[0]
ffin = np.pi * np.array(flux_all).sum(axis=0) # erg/s/cm^2/A
else:
ind = np.where(weights==weights.max())[0][0]
f = fnames[ind]
Teff = teff_all[ind]
log_g = logg_all[ind]
metallicity = metal_all[ind]
# Download files if doesn't exist
if not os.path.isfile(res_dir+f):
download_BOSZ_spectrum(Teff, metallicity, log_g, res,
carbon=carbon, alpha=alpha)
d = fits.getdata(res_dir+f, 1)
wfin = d['Wavelength']
ffin = np.pi * d['SpecificIntensity'] # erg/s/cm^2/A
name = 'BOSZ(Teff={},z={},logG={})'.format(Teff, metallicity, log_g)
sp = S.ArraySpectrum(wfin[:-1], ffin[:-1], 'angstrom', 'flam', name=name)
return sp
[docs]def stellar_spectrum(sptype, *renorm_args, **kwargs):
"""Stellar spectrum
Similar to specFromSpectralType() in WebbPSF/Poppy, this function uses
a dictionary of fiducial values to determine an appropriate spectral model.
If the input spectral type is not found, this function interpolates the
effective temperature, metallicity, and log g values .
You can also specify renormalization arguments to pass to ``sp.renorm()``.
The order (after ``sptype``) should be (``value, units, bandpass``):
>>> sp = stellar_spectrum('G2V', 10, 'vegamag', bp)
Flat spectrum (in photlam) are also allowed via the 'flat' string.
Use ``catname='bosz'`` for BOSZ stellar atmosphere (ATLAS9) (default)
Use ``catname='ck04models'`` keyword for ck04 models
Use ``catname='phoenix'`` keyword for Phoenix models
Keywords exist to directly specify Teff, metallicity, an log_g rather
than a spectral type.
Parameters
----------
sptype : str
Spectral type, such as 'A0V' or 'K2III'.
renorm_args : tuple
Renormalization arguments to pass to ``sp.renorm()``.
The order (after ``sptype``) should be (``value, units, bandpass``)
Bandpass should be a :mod:`pysynphot.obsbandpass` type.
Keyword Args
------------
catname : str
Catalog name, including 'bosz', 'ck04models', and 'phoenix'.
Default is 'bosz', which comes from :func:`BOSZ_spectrum`.
Teff : float
Effective temperature ranging from 3500K to 30000K.
metallicity : float
Metallicity [Fe/H] value ranging from -2.5 to 0.5.
log_g : float
Surface gravity (log g) from 0 to 5.
res : str
BOSZ spectral resolution to use (200 or 2000 or 20000).
Default: 2000.
interpolate : bool
Interpolate BOSZ spectrum using a weighted average of grid points
surrounding the desired input parameters. Default is True.
Default: True
"""
def call_bosz(v0,v1,v2,**kwargs):
if v0 > 35000:
v0 = 35000
_log.warn("BOSZ models stop at 35000K. Setting Teff=35000.")
if v0 < 3500:
v0 = 3500
_log.warn("BOSZ models start at 3500K. Setting Teff=3500.")
return BOSZ_spectrum(v0, v1, v2, **kwargs)
Teff = kwargs.pop('Teff', None)
metallicity = kwargs.pop('metallicity', None)
log_g = kwargs.pop('log_g', None)
catname = kwargs.get('catname', 'bosz')
lookuptable = {
# https://www.pas.rochester.edu/~emamajek/EEM_dwarf_UBVIJHK_colors_Teff.txt
"O0V": (50000, 0.0, 4.0), # Bracketing for interpolation
"O3V": (46000, 0.0, 4.0),
"O5V": (43000, 0.0, 4.5),
"O7V": (36500, 0.0, 4.0),
"O9V": (32500, 0.0, 4.0),
"B0V": (31500, 0.0, 4.0),
"B1V": (26000, 0.0, 4.0),
"B3V": (17000, 0.0, 4.0),
"B5V": (15700, 0.0, 4.0),
"B8V": (12500, 0.0, 4.0),
"A0V": (9700, 0.0, 4.0),
"A1V": (9200, 0.0, 4.0),
"A3V": (8550, 0.0, 4.0),
"A5V": (8080, 0.0, 4.0),
"F0V": (7220, 0.0, 4.0),
"F2V": (6810, 0.0, 4.0),
"F5V": (6510, 0.0, 4.0),
"F8V": (6170, 0.0, 4.5),
"G0V": (5920, 0.0, 4.5),
"G2V": (5770, 0.0, 4.5),
"G5V": (5660, 0.0, 4.5),
"G8V": (5490, 0.0, 4.5),
"K0V": (5280, 0.0, 4.5),
"K2V": (5040, 0.0, 4.5),
"K5V": (4410, 0.0, 4.5),
"K7V": (4070, 0.0, 4.5),
"M0V": (3870, 0.0, 4.5),
"M2V": (3550, 0.0, 4.5),
"M5V": (3030, 0.0, 5.0),
"M9V": (2400, 0.0, 5.0), # Bracketing for interpolation
"O0IV": (50000, 0.0, 3.8), # Bracketing for interpolation
"B0IV": (30000, 0.0, 3.8),
"B8IV": (12000, 0.0, 3.8),
"A0IV": (9500, 0.0, 3.8),
"A5IV": (8250, 0.0, 3.8),
"F0IV": (7250, 0.0, 3.8),
"F8IV": (6250, 0.0, 4.3),
"G0IV": (6000, 0.0, 4.3),
"G8IV": (5500, 0.0, 4.3),
"K0IV": (5250, 0.0, 4.3),
"K7IV": (4000, 0.0, 4.3),
"M0IV": (3750, 0.0, 4.3),
"M9IV": (3000, 0.0, 4.7), # Bracketing for interpolation
"O0III": (55000, 0.0, 3.5), # Bracketing for interpolation
"B0III": (29000, 0.0, 3.5),
"B5III": (15000, 0.0, 3.5),
"G0III": (5750, 0.0, 3.0),
"G5III": (5250, 0.0, 2.5),
"K0III": (4750, 0.0, 2.0),
"K5III": (4000, 0.0, 1.5),
"M0III": (3750, 0.0, 1.5),
"M6III": (3000, 0.0, 1.0), # Bracketing for interpolation
"O0I": (45000, 0.0, 5.0), # Bracketing for interpolation
"O6I": (39000, 0.0, 4.5),
"O8I": (34000, 0.0, 4.0),
"B0I": (26000, 0.0, 3.0),
"B5I": (14000, 0.0, 3.0),
"A0I": (9750, 0.0, 2.0),
"A5I": (8500, 0.0, 2.0),
"F0I": (7750, 0.0, 2.0),
"F5I": (7000, 0.0, 1.5),
"G0I": (5500, 0.0, 1.5),
"G5I": (4750, 0.0, 1.0),
"K0I": (4500, 0.0, 1.0),
"K5I": (3750, 0.0, 0.5),
"M0I": (3750, 0.0, 0.0),
"M2I": (3500, 0.0, 0.0),
"M5I": (3000, 0.0, 0.0), # Bracketing for interpolation
}
def sort_sptype(typestr):
letter = typestr[0]
lettervals = {'O': 0, 'B': 1, 'A': 2, 'F': 3, 'G': 4, 'K': 5, 'M': 6}
value = lettervals[letter] * 1.0
value += (int(typestr[1]) * 0.1)
if "III" in typestr:
value += 30
elif "I" in typestr:
value += 10
elif "V" in typestr:
value += 50
return value
# Generate list of spectral types
sptype_list = list(lookuptable.keys())
# Test if the user wants a flat spectrum (in photlam)
# Check if Teff, metallicity, and log_g are specified
if (Teff is not None) and (metallicity is not None) and (log_g is not None):
v0, v1, v2 = (Teff, metallicity, log_g)
if 'bosz' in catname.lower():
sp = call_bosz(v0,v1,v2,**kwargs)
else:
if ('ck04models' in catname.lower()) and (v0<3500):
_log.warn("ck04 models stop at 3500K. Setting Teff=3500.")
v0 = 3500
sp = S.Icat(catname, v0, v1, v2)
sp.name = '({:.0f},{:0.1f},{:0.1f})'.format(v0,v1,v2)
elif 'flat' in sptype.lower():
# waveset = S.refs._default_waveset
# sp = S.ArraySpectrum(waveset, 0*waveset + 10.)
sp = S.FlatSpectrum(10, fluxunits='photlam')
sp.name = 'Flat spectrum in photlam'
elif sptype in sptype_list:
v0, v1, v2 = lookuptable[sptype]
if 'bosz' in catname.lower():
sp = call_bosz(v0,v1,v2,**kwargs)
else:
if ('ck04models' in catname.lower()) and (v0<3500):
_log.warn("ck04 models start at 3500K. Setting Teff=3500.")
v0 = 3500
sp = S.Icat(catname, v0, v1, v2)
sp.name = sptype
else: # Interpolate values for undefined sptype
# Sort the list and return their rank values
sptype_list.sort(key=sort_sptype)
rank_list = np.array([sort_sptype(st) for st in sptype_list])
# Find the rank of the input spec type
rank = sort_sptype(sptype)
# Grab values from tuples and interpolate based on rank
tup_list0 = np.array([lookuptable[st][0] for st in sptype_list])
tup_list1 = np.array([lookuptable[st][1] for st in sptype_list])
tup_list2 = np.array([lookuptable[st][2] for st in sptype_list])
v0 = np.interp(rank, rank_list, tup_list0)
v1 = np.interp(rank, rank_list, tup_list1)
v2 = np.interp(rank, rank_list, tup_list2)
if 'bosz' in catname.lower():
sp = call_bosz(v0,v1,v2,**kwargs)
else:
if ('ck04models' in catname.lower()) and (v0<3500):
_log.warn("ck04 models stop at 3500K. Setting Teff=3500.")
v0 = 3500
sp = S.Icat(catname, v0, v1, v2)
sp.name = sptype
#print(int(v0),v1,v2)
# Renormalize if renorm_args exist
if len(renorm_args) > 0:
sp_norm = sp.renorm(*renorm_args)
sp_norm.name = sp.name
sp = sp_norm
return sp
# Class for creating an input source spectrum
[docs]class source_spectrum(object):
"""Model source spectrum
The class ingests spectral information of a given target
and generates :mod:`pysynphot.spectrum` model fit to the
known photometric SED. Two model routines can fit. The
first is a very simple scale factor that is applied to the
input spectrum, while the second takes the input spectrum
and adds an IR excess modeled as a modified blackbody function.
Parameters
----------
name : string
Source name.
sptype : string
Assumed stellar spectral type. Not relevant if Teff, metallicity,
and log_g are specified.
mag_val : float
Magnitude of input bandpass for initial scaling of spectrum.
bp : :mod:`pysynphot.obsbandpass`
Bandpass to apply initial mag_val scaling.
votable_file: string
VOTable name that holds the source's photometry. The user can
find the relevant data at http://vizier.u-strasbg.fr/vizier/sed/
and click download data.
Keyword Args
------------
Teff : float
Effective temperature ranging from 3500K to 30000K.
metallicity : float
Metallicity [Fe/H] value ranging from -2.5 to 0.5.
log_g : float
Surface gravity (log g) from 0 to 5.
catname : str
Catalog name, including 'bosz', 'ck04models', and 'phoenix'.
Default is 'bosz', which comes from :func:`BOSZ_spectrum`.
res : str
Spectral resolution to use (200 or 2000 or 20000).
interpolate : bool
Interpolate spectrum using a weighted average of grid points
surrounding the desired input parameters.
Example
-------
Generate a source spectrum and fit photometric data
>>> import webbpsf_ext
>>> from webbpsf_ext.spectra import source_spectrum
>>>
>>> name = 'HR8799'
>>> vot = 'votables/{}.vot'.format(name)
>>> bp_k = webbpsf_ext.bp_2mass('k')
>>>
>>> # Read in stellar spectrum model and normalize to Ks = 5.24
>>> src = source_spectrum(name, 'F0V', 5.24, bp_k, vot,
>>> Teff=7430, metallicity=-0.47, log_g=4.35)
>>> # Fit model to photometry from 0.1 - 30 micons
>>> # Saves pysynphot spectral object at src.sp_model
>>> src.fit_SED(wlim=[0.1,30])
>>> sp_sci = src.sp_model
"""
[docs] def __init__(self, name, sptype, mag_val, bp, votable_file,
Teff=None, metallicity=None, log_g=None, Av=None, **kwargs):
self.name = name
# Setup initial spectrum
kwargs['Teff'] = Teff
kwargs['metallicity'] = metallicity
kwargs['log_g'] = log_g
self.sp0 = stellar_spectrum(sptype, mag_val, 'vegamag', bp, **kwargs)
# Read in a low res version for photometry matching
kwargs['res'] = 200
self.sp_lowres = stellar_spectrum(sptype, mag_val, 'vegamag', bp, **kwargs)
if Av is not None:
Rv = 4
self.sp0 = self.sp0 * S.Extinction(Av/Rv,name='mwrv4')
self.sp_lowres = self.sp_lowres * S.Extinction(Av/Rv,name='mwrv4')
self.sp0 = self.sp0.renorm(mag_val, 'vegamag', bp)
self.sp_lowres = self.sp_lowres.renorm(mag_val, 'vegamag', bp)
self.sp0.name = sptype
self.sp_lowres.name = sptype
# Init model to None
self.sp_model = None
# Readin photometry
self.votable_file = votable_file
self._gen_table()
self._combine_fluxes()
def _gen_table(self):
"""Read VOTable and convert to astropy table"""
# Import source SED from VOTable
from astropy.io.votable import parse_single_table
table = parse_single_table(self.votable_file)
# Convert to astropy table
tbl = table.to_table()
freq = tbl['sed_freq'] * 1e9 # Hz
wave_m = 2.99792458E+08 / freq
wave_A = 1e10 * wave_m
# Add wavelength column
col = tbl.Column(wave_A, 'sed_wave')
col.unit = 'Angstrom'
tbl.add_column(col)
# Sort flux monotomically with wavelength
tbl.sort(['sed_wave', 'sed_flux'])
self.table = tbl
def _combine_fluxes(self):
"""Average duplicate data points
Creates average of duplicate point stored in self.sp_phot.
"""
table = self.table
wave = table['sed_wave']
flux = table["sed_flux"]
eflux = table["sed_eflux"]
# Average duplicate data points
uwave, ucnt = np.unique(wave, return_counts=True)
uflux = []
uflux_e = []
for i, w in enumerate(uwave):
ind = (wave==w)
flx = np.median(flux[ind]) if ucnt[i]>1 else flux[ind][0]
uflux.append(flx)
eflx = medabsdev(flux[ind]) if ucnt[i]>1 else eflux[ind][0]
uflux_e.append(eflx)
uflux = np.array(uflux)
uflux_e = np.array(uflux_e)
# Photometric data points
sp_phot = S.ArraySpectrum(uwave, uflux,
waveunits=wave.unit.name,
fluxunits=flux.unit.name)
sp_phot.convert('Angstrom')
sp_phot.convert('Flam')
sp_phot_e = S.ArraySpectrum(uwave, uflux_e,
waveunits=wave.unit.name,
fluxunits=eflux.unit.name)
sp_phot_e.convert('Angstrom')
sp_phot_e.convert('Flam')
self.sp_phot = sp_phot
self.sp_phot_e = sp_phot_e
[docs] def bb_jy(self, wave, T):
"""Blackbody function (Jy)
For a given wavelength set (in um) and a Temperature (K),
return the blackbody curve in units of Jy.
Parameters
----------
wave : array_like
Wavelength array in microns
T : float
Temperature of blackbody (K)
"""
# Physical Constants
#H = 6.62620000E-27 # Planck's constant in cgs units
HS = 6.62620000E-34 # Planck's constant in standard units
C = 2.99792458E+08 # speed of light in standard units
K = 1.38064852E-23 # Boltzmann constant in standard units
# Blackbody coefficients (SI units)
C1 = 2.0 * HS * C # Power * unit area / steradian
C2 = HS * C / K
w_m = wave * 1e-6
exponent = C2 / (w_m * T)
expfactor = np.exp(exponent)
return 1.0E+26 * C1 * (w_m**-3.0) / (expfactor - 1.0)
[docs] def model_scale(self, x, sp=None):
"""Simple model to scale stellar spectrum"""
sp = self.sp_lowres if sp is None else sp
return x[0] * sp
[docs] def model_IRexcess(self, x, sp=None):
"""Model for stellar spectrum with IR excess
Model of a stellar spectrum plus IR excess, where the
excess is a modified blackbody. The final model follows
the form:
.. math::
x_0 BB(\lambda, x_1) \lambda^{x_2}
"""
sp = self.sp_lowres if sp is None else sp
bb_flux = x[0] * self.bb_jy(sp.wave/1e4, x[1]) * (sp.wave/1e4)**x[2] / 1e17
sp_bb = S.ArraySpectrum(sp.wave, bb_flux, fluxunits='Jy')
sp_bb.convert('Flam')
return sp + sp_bb
[docs] def func_resid(self, x, IR_excess=False, wlim=[0.1, 30], use_err=True):
"""Calculate model residuals
Parameters
----------
x : array_like
Model parameters for either `model_scale` or `model_IRexcess`.
See these two functions for more details.
IR_excess: bool
Include IR excess in model fit? This is a simple modified blackbody.
wlim : array_like
Min and max limits for wavelengths to consider (microns).
use_err : bool
Should we use the uncertainties in the SED photometry for weighting?
"""
# Star model and photometric data
sp_star = self.sp_lowres
sp_phot = self.sp_phot
sp_phot_e = self.sp_phot_e
# Which model are we using?
func_model = self.model_IRexcess if IR_excess else self.model_scale
sp_model = func_model(x, sp_star)
wvals = sp_phot.wave
wmin, wmax = np.array(wlim)*1e4
ind = (wvals >= wmin) & (wvals <= wmax)
wvals = wvals[ind]
yvals = sp_phot.flux[ind]
evals = sp_phot_e.flux[ind]
# Instead of interpolating on a high-resolution grid,
# we should really rebin onto a more coarse grid.
mod_interp = np.interp(wvals, sp_star.wave, sp_model.flux)
# Normalize values so the residuals aren't super small/large
norm = np.mean(yvals)
resid = (mod_interp - yvals)
if use_err: resid /= evals
# Return non-NaN normalized values
return resid[~np.isnan(resid)] / norm
[docs] def fit_SED(self, x0=None, robust=True, use_err=True, IR_excess=False,
wlim=[0.3,10], verbose=True):
"""Fit a model function to photometry
Use :func:`scipy.optimize.least_squares` to find the best fit
model to the observed photometric data. If no parameters passed,
then defaults are set.
Keyword Args
------------
x0 : array_like
Initial guess of independent variables.
robust : bool
Perform an outlier-resistant fit.
use_err : bool
Should we use the uncertainties in the SED photometry for weighting?
IR_excess: bool
Include IR excess in model fit? This is a simple modified blackbody.
wlim : array_like
Min and max limits for wavelengths to consider (microns).
verbose : bool
Print out best-fit model parameters. Defalt is True.
"""
from scipy.optimize import least_squares
# Default initial starting parameters
if x0 is None:
x0 = [1.0, 2000.0, 0.5] if IR_excess else [1.0]
# Robust fit?
loss = 'soft_l1' if robust else 'linear'
# Perform least-squares fit
kwargs={'IR_excess':IR_excess, 'wlim':wlim, 'use_err':use_err}
res = least_squares(self.func_resid, x0, bounds=(0,np.inf), loss=loss,
kwargs=kwargs)
out = res.x
if verbose: print(out)
# Which model are we using?
func_model = self.model_IRexcess if IR_excess else self.model_scale
# Create final model spectrum
sp_model = func_model(out, self.sp0)
sp_model.name = self.name
self.sp_model = sp_model
def plot_SED(self, ax=None, return_figax=False, xr=[0.3,30], yr=None,
units='Jy', **kwargs):
sp0 = self.sp0
sp_phot = self.sp_phot
sp_phot_e = self.sp_phot_e
sp_model = self.sp_model
# Convert to Jy and save original units
sp0_units = sp0.fluxunits.name
sp_phot_units = sp_phot.fluxunits.name
# nuFnu or lamFlam?
if (units=='nufnu') or (units=='lamflam'):
units = 'flam'
lfl = True
else:
lfl = False
sp0.convert(units)
sp_phot.convert(units)
if ax is None:
fig, ax = plt.subplots(1,1, figsize=(8,5))
w = sp0.wave / 1e4
f = sp0.flux
if lfl:
f = f * sp0.wave
if xr is not None:
ind = (w>=xr[0]) & (w<=xr[1])
w, f = (w[ind], f[ind])
ax.loglog(w, f, lw=1, label='Photosphere', **kwargs)
w = sp_phot.wave / 1e4
f = sp_phot.flux
f_err = sp_phot_e.flux
if lfl:
f = f * sp_phot.wave
f_err = f_err * sp_phot.wave
if xr is not None:
ind = (w>=xr[0]) & (w<=xr[1])
w, f, f_err = (w[ind], f[ind], f_err[ind])
ax.errorbar(w, f, yerr=f_err, marker='.', ls='none', label='Photometry')
if sp_model is not None:
sp_model_units = sp_model.fluxunits.name
sp_model.convert(units)
w = sp_model.wave / 1e4
f = sp_model.flux
if lfl:
f = f * sp_model.wave
if xr is not None:
ind = (w>=xr[0]) & (w<=xr[1])
w, f = (w[ind], f[ind])
ax.plot(w, f, lw=1, label='Model Fit')
sp_model.convert(sp_model_units)
# Labels for various units
ulabels = {'photlam': u'photons s$^{-1}$ cm$^{-2}$ A$^{-1}$',
'photnu' : u'photons s$^{-1}$ cm$^{-2}$ Hz$^{-1}$',
'flam' : u'erg s$^{-1}$ cm$^{-2}$ A$^{-1}$',
'fnu' : u'erg s$^{-1}$ cm$^{-2}$ Hz$^{-1}$',
'counts' : u'photons s$^{-1}$',
}
if lfl: # Special case nuFnu or lamFlam
yunits = u'erg s$^{-1}$ cm$^{-2}$'
else:
yunits = ulabels.get(units, units)
ax.set_xlabel('Wavelength (microns)')
ax.set_ylabel('Flux ({})'.format(yunits))
ax.set_title(self.name)
if xr is not None:
ax.set_xlim(xr)
if yr is not None:
ax.set_ylim(yr)
# Better formatting of ticks marks
from matplotlib.ticker import LogLocator, AutoLocator, NullLocator
from matplotlib.ticker import FuncFormatter, NullFormatter
formatter = FuncFormatter(lambda y, _: '{:.16g}'.format(y))
xr = ax.get_xlim()
if xr[1] < 10*xr[0]:
ax.xaxis.set_major_locator(AutoLocator())
ax.xaxis.set_minor_locator(NullLocator())
else:
ax.xaxis.set_major_locator(LogLocator())
ax.xaxis.set_major_formatter(formatter)
yr = ax.get_ylim()
if yr[1] < 10*yr[0]:
ax.yaxis.set_major_locator(AutoLocator())
ax.yaxis.set_minor_formatter(NullFormatter())
ax.yaxis.get_major_locator().set_params(nbins=10, steps=[1,10])
else:
ax.yaxis.set_major_locator(LogLocator())
ax.yaxis.set_major_formatter(formatter)
ax.legend()
# Convert back to original units
sp0.convert(sp0_units)
sp_phot.convert(sp_phot_units)
if ax is None:
fig.tight_layout()
if return_figax: return (fig,ax)
# Class for reading in planet spectra
[docs]class planets_sb12(object):
"""Exoplanet spectrum from Spiegel & Burrows (2012)
This contains 1680 files, one for each of 4 atmosphere types, each of
15 masses, and each of 28 ages. Wavelength range of 0.8 - 15.0 um at
moderate resolution (R ~ 204).
The flux in the source files are at 10 pc. If the distance is specified,
then the flux will be scaled accordingly. This is also true if the distance
is changed by the user. All other properties (atmo, mass, age, entropy) are
not adjustable once loaded.
Parameters
----------
atmo: str
A string consisting of one of four atmosphere types:
- 'hy1s' = hybrid clouds, solar abundances
- 'hy3s' = hybrid clouds, 3x solar abundances
- 'cf1s' = cloud-free, solar abundances
- 'cf3s' = cloud-free, 3x solar abundances
mass: float
A number 1 to 15 Jupiter masses (increments of 1MJup).
age: float
Age in millions of years (1-1000)
entropy: float
Initial entropy (8.0-13.0) in increments of 0.25
distance: float
Assumed distance in pc (default is 10pc)
accr : bool
Include accretion (default: False)?
mmdot : float
From Zhu et al. (2015), the Mjup^2/yr value.
If set to None then calculated from age and mass.
mdot : float
Or use mdot (Mjup/yr) instead of mmdot.
accr_rin : float
Inner radius of accretion disk (units of RJup; default: 2)
truncated: bool
Full disk or truncated (ie., MRI; default: False)?
base_dir: str, None
Location of atmospheric model sub-directories.
"""
# Define default self.base_dir
_base_dir = _spec_dir + 'spiegel/'
[docs] def __init__(self, atmo='hy1s', mass=1, age=100, entropy=10.0, distance=10,
accr=False, mmdot=None, mdot=None, accr_rin=2.0, truncated=False,
base_dir=None, **kwargs):
self._atmo = atmo
self._mass = mass
self._age = age
self._entropy = entropy
# Directory of atmospheric files
if base_dir is not None:
self._base_dir = base_dir
# Download and extract tar.gz file if directory does not exist
if not os.path.isdir(self.sub_dir):
tar_filename = f'SB.{self.atmo}.tar.gz'
self._download_tar(extract=True, remove=True, tar_filename=tar_filename)
# Find and read appropriate file
self._get_file()
self._read_file()
self.distance = distance
self.accr = accr
if not accr:
self.mmdot = 0
elif mmdot is not None:
self.mmdot = mmdot
elif mdot is not None:
self.mmdot = self.mass * mdot # MJup^2/yr
else:
mdot = self.mass / (1e6 * self.age) # Assumed MJup/yr
self.mmdot = self.mass * mdot # MJup^2/yr
self.rin = accr_rin
self.truncated = truncated
def _download_tar(self, extract=True, remove=True, tar_filename=None):
"""Download and extract tar file"""
import requests
if tar_filename is None:
tar_filename = f'SB.{self.atmo}.tar.gz'
tar_path = os.path.join(self._base_dir, tar_filename)
# check if tar file already exists
if not os.path.exists(tar_path):
# URL location
url_base = 'http://mips.as.arizona.edu/~jleisenring/spiegel/'
url_path = os.path.join(url_base, tar_filename)
# Make request
_log.info(f'Downloading file: {tar_filename}')
req = requests.get(url_path, allow_redirects=True)
# Raise exception if file not found or other HTTP error
if req.status_code != requests.codes.ok:
req.raise_for_status()
# Save file to directory
_log.info(f'Saving file to: {tar_path}')
open(tar_path, 'wb').write(req.content)
if extract:
self._extract_dir(tar_path=tar_path, remove=remove)
def _extract_dir(self, tar_path=None, remove=False):
"""Check if directory exists or is still .tar.gz"""
import tarfile
if tar_path is None:
tar_path = os.path.join(self._base_dir, f'SB.{self.atmo}.tar.gz')
if not os.path.isdir(self.sub_dir):
tar = tarfile.open(tar_path, "r:gz")
tar.extractall(self._base_dir)
tar.close()
# Remove tar file
if remove:
try:
os.remove(tar_path)
except FileNotFoundError:
pass
def _get_file(self):
"""Find the file closest to the input parameters"""
files = []; masses = []; ages = []
for file in os.listdir(self.sub_dir):
files.append(file)
fsplit = re.split('[_\.]',file)
ind_mass = fsplit.index('mass') + 1
ind_age = fsplit.index('age') + 1
masses.append(int(fsplit[ind_mass]))
ages.append(int(fsplit[ind_age]))
files = np.array(files)
ages = np.array(ages)
masses = np.array(masses)
# Find those indices closest in mass
mdiff = np.abs(masses - self.mass)
ind_mass = mdiff == np.min(mdiff)
# Of those masses, find the closest age
adiff = np.abs(ages - self.age)
ind_age = adiff[ind_mass] == np.min(adiff[ind_mass])
# Get the final file name
self.file = ((files[ind_mass])[ind_age])[0]
# Update attributes
self._mass = masses[ind_mass][0]
self._age = ages[ind_mass][ind_age][0]
def _read_file(self):
"""Read in the file data"""
# Read in the file's content row-by-row (saved as a string)
file_path = os.path.join(self.sub_dir, self.file)
with open(file_path) as f:
content = f.readlines()
content = [x.strip('\n') for x in content]
# Parse the strings into an array
# Row #, Value
# 1 col 1: age (Myr);
# cols 2-601: wavelength (in microns, in range 0.8-15.0)
# 2-end col 1: initial S;
# cols 2-601: F_nu (in mJy for a source at 10 pc)
ncol = len(content[0].split())
nrow = len(content)
arr = np.zeros([nrow,ncol])
for i,row in enumerate(content):
arr[i,:] = np.array(row.split(), dtype='float64')
# Find the closest entropy and save
entropy = arr[1:,0]
diff = np.abs(self.entropy - entropy)
ind = diff == np.min(diff)
# Update entropy attribute
self._entropy = entropy[ind][0]
# Get Fluxes
self._flux = arr[1:,1:][ind,:].flatten()
self._fluxunits = 'mJy'
# Save the wavelength information
self._wave = arr[0,1:]
self._waveunits = 'um'
# Distance (10 pc)
self._distance = 10.0
@property
def sub_dir(self):
return os.path.join(self._base_dir, f'SB.{self.atmo}')
@property
def mdot(self):
"""Accretion rate in MJup/yr"""
return self.mmdot / self.mass
@property
def wave(self):
"""Wavelength of spectrum"""
return self._wave
@property
def waveunits(self):
"""Wavelength units"""
return self._waveunits
@property
def flux(self):
"""Spectral flux"""
return self._flux
@property
def fluxunits(self):
"""Flux units"""
return self._fluxunits
@property
def distance(self):
"""Assumed distance to source (pc)"""
return self._distance
@distance.setter
def distance(self, value):
self._flux = self._flux * (self._distance/value)**2
self._distance = value
@property
def atmo(self):
"""Atmosphere type
"""
return self._atmo
@property
def mass(self):
"""Mass of planet (MJup)"""
return self._mass
@property
def age(self):
"""Age in millions of years"""
return self._age
@property
def entropy(self):
"""Initial entropy (8.0-13.0)"""
return self._entropy
[docs] def export_pysynphot(self, waveout='angstrom', fluxout='flam'):
"""Output to :mod:`pysynphot.spectrum` object
Export object settings to a :mod:`pysynphot.spectrum`.
Parameters
----------
waveout : str
Wavelength units for output
fluxout : str
Flux units for output
"""
w = self.wave; f = self.flux
name = (re.split('[\.]', self.file))[0]#[5:]
sp = S.ArraySpectrum(w, f, name=name, waveunits=self.waveunits, fluxunits=self.fluxunits)
sp.convert(waveout)
sp.convert(fluxout)
if self.accr and (self.mmdot>0):
sp_mdot = sp_accr(self.mmdot, rin=self.rin,
dist=self.distance, truncated=self.truncated,
waveout=waveout, fluxout=fluxout)
# Interpolate accretion spectrum at each wavelength
# and create new composite spectrum
fnew = np.interp(sp.wave, sp_mdot.wave, sp_mdot.flux)
sp_new = S.ArraySpectrum(sp.wave, sp.flux+fnew,
waveunits=waveout, fluxunits=fluxout)
return sp_new
else:
return sp
[docs]def sp_accr(mmdot, rin=2, dist=10, truncated=False,
waveout='angstrom', fluxout='flam', base_dir=None):
"""Exoplanet accretion flux values (Zhu et al., 2015).
Calculated the wavelength-dependent flux of an exoplanet accretion disk/shock
from Zhu et al. (2015).
Note
----
This function only uses the table of photometric values to calculate
photometric brightness from a source, so not very useful for simulating
spectral observations.
Parameters
----------
mmdot : float
Product of the exoplanet mass and mass accretion rate (MJup^2/yr).
Values range from 1e-7 to 1e-2.
rin : float
Inner radius of accretion disk (units of RJup; default: 2).
dist : float
Distance to object (pc).
truncated: bool
If True, then the values are for a disk with Rout=50 RJup,
otherwise, values were calculated for a full disk (Rout=1000 RJup).
Accretion from a "tuncated disk" is due mainly to MRI.
Luminosities for full and truncated disks are very similar.
waveout : str
Wavelength units for output
fluxout : str
Flux units for output
base_dir: str, None
Location of accretion model sub-directories.
"""
base_dir = _spec_dir if base_dir is None else base_dir
fname = base_dir + 'zhu15_accr.txt'
names = ('MMdot', 'Rin', 'Tmax', 'J', 'H', 'K', 'L', 'M', 'N', 'J2', 'H2', 'K2', 'L2', 'M2', 'N2')
tbl = ascii.read(fname, guess=True, names=names)
# Inner radius values and Mdot values
rin_vals = np.unique(tbl['Rin'])
mdot_vals = np.unique(tbl['MMdot'])
nmdot = len(mdot_vals)
assert (rin >=rin_vals.min()) & (rin <=rin_vals.max()), "rin is out of range"
assert (mmdot>=mdot_vals.min()) & (mmdot<=mdot_vals.max()), "mmdot is out of range"
if truncated:
mag_names = ('J2', 'H2', 'K2', 'L2', 'M2', 'N2')
else:
mag_names = ('J', 'H', 'K', 'L', 'M', 'N')
wcen = np.array([ 1.2, 1.6, 2.2, 3.8, 4.8, 10.0])
zpt = np.array([1600, 1020, 657, 252, 163, 39.8])
mag_arr = np.zeros([6,nmdot])
for i, mv in enumerate(mdot_vals):
for j, mag in enumerate(mag_names):
tbl_sub = tbl[tbl['MMdot']==mv]
rinvals = tbl_sub['Rin']
magvals = tbl_sub[mag]
mag_arr[j,i] = np.interp(rin, rinvals, magvals)
mag_vals = np.zeros(6)
for j in range(6):
xi = 10**(mmdot)
xp = 10**(mdot_vals)
yp = 10**(mag_arr[j])
mag_vals[j] = np.log10(np.interp(xi, xp, yp))
mag_vals += 5*np.log10(dist/10)
flux_Jy = 10**(-mag_vals/2.5) * zpt
sp = S.ArraySpectrum(wcen*1e4, flux_Jy, fluxunits='Jy')
sp.convert(waveout)
sp.convert(fluxout)
return sp
[docs]def jupiter_spec(dist=10, waveout='angstrom', fluxout='flam', base_dir=None):
"""Jupiter as an Exoplanet
Read in theoretical Jupiter spectrum from Irwin et al. 2014 and output
as a :mod:`pysynphot.spectrum`.
Parameters
===========
dist : float
Distance to Jupiter (pc).
waveout : str
Wavelength units for output.
fluxout : str
Flux units for output.
base_dir: str, None
Location of tabulated file irwin_2014_ref_spectra.txt.
"""
base_dir = _spec_dir + 'solar_system/' if base_dir is None else base_dir
fname = base_dir + 'irwin_2014_ref_spectra.txt'
# Column 1: Wavelength (in microns)
# Column 2: 100*Ap/Astar (Earth-Sun Primary Transit)
# Column 3: 100*Ap/Astar (Earth-Mdwarf Primary Transit)
# Column 4: 100*Ap/Astar (Jupiter-Sun Primary Transit)
# Column 5: Fp/Astar (Earth-Sun Secondary Eclipse)
# Column 6: Disc-averaged radiance of Earth (W cm-2 sr-1 micron-1)
# Column 7: Fp/Astar (Jupiter-Sun Secondary Eclipse)
# Column 8: Disc-averaged radiance of Jupiter (W cm-2 sr-1 micron-1)
# Column 9: Solar spectral irradiance spectrum (W micron-1)
# (Solar Radius = 695500.0 km)
# Column 10: Mdwarf spectral irradiance spectrum (W micron-1)
# (Mdwarf Radius = 97995.0 km)
data = ascii.read(fname, data_start=14)
wspec = data['col1'] * 1e4 # Angstrom
fspec = data['col8'] * 1e3 # erg s-1 cm^-2 A^-1 sr^-1
# Steradians to square arcsec
sr_to_asec2 = (3600*180/np.pi)**2
fspec /= sr_to_asec2 # *** / arcsec^2
# Angular size of Jupiter at some distance
RJup_km = 71492.0
au_to_km = 149597870.7
# Angular size (arcsec) of Jupiter radius
RJup_asec = RJup_km / au_to_km / dist
area = np.pi * RJup_asec**2
# flux in f_lambda
fspec *= area # erg s-1 cm^-2 A^-1
sp = S.ArraySpectrum(wspec, fspec, fluxunits='flam')
sp.convert(waveout)
sp.convert(fluxout)
return sp
[docs]def linder_table(file=None, **kwargs):
"""Load Linder Model Table
Function to read in isochrone models from Linder et al. 2019.
Returns an astropy Table.
Parameters
----------
file : string
Location and name of Linder et al file.
Default is ``BEX_evol_mags_-3_MH_0.00.dat``.
"""
# Default file to read and load
if file is None:
indir = _spec_dir + 'linder/isochrones/'
file = indir + 'BEX_evol_mags_-3_MH_0.00.dat'
with open(file) as f:
content = f.readlines()
content = [x.strip('\n') for x in content]
cnames = content[2].split(',')
cnames = [name.split(':')[1] for name in cnames]
ncol = len(cnames)
content_arr = []
for line in content[4:]:
arr = np.array(line.split()).astype(np.float)
if len(arr)>0:
content_arr.append(arr)
content_arr = np.array(content_arr)
# Convert to Astropy Table
tbl = Table(rows=content_arr, names=cnames)
return tbl
[docs]def linder_filter(table, filt, age, dist=10, cond_file=None, **kwargs):
"""Linder Mags vs Mass Arrays
Given a Linder table, filter name, and age (Myr), return arrays of MJup
and Vega mags. If distance (pc) is provided, then return the apparent
magnitude, otherwise absolute magnitude at 10pc.
This function takes the isochrones tables from Linder et al 2019 and
creates a irregular contour grid of filter magnitude and log(age)
where the z-axis is log(mass). This is mapped onto a regular grid
that is interpolated within the data boundaries and linearly
extrapolated outside of the region of available data.
Parameters
==========
table : astropy table
Astropy table output from `linder_table`.
filt : string
Name of NIRCam filter.
age : float
Age in Myr of planet.
dist : float
Distance in pc. Default is 10pc (abs mag).
"""
try:
x = table[filt]
except KeyError:
# In case specific filter doesn't exist, interpolate
x = []
cnames = [
'SPHEREY', 'NACOJ', 'NACOH', 'NACOKs', 'NACOLp', 'NACOMp',
'F115W', 'F150W', 'F200W', 'F277W', 'F356W', 'F444W', 'F560W',
'F770W', 'F1000W', 'F1280W', 'F1500W', 'F1800W', 'F2100W', 'F2550W',
]
wvals = np.array([
1.04, 1.27, 1.66, 2.20, 3.80, 4.80,
1.15, 1.50, 2.00, 2.76, 3.57, 4.41, 5.60,
7.67, 9.97, 12.84, 15.08, 17.99, 20.85, 25.27,
])
# Sort by wavelength
isort = np.argsort(wvals)
cnames = list(np.array(cnames)[isort])
wvals = wvals[isort]
# Turn table data into array and interpolate at filter wavelength
tbl_arr = np.array([table[cn].data for cn in cnames]).transpose()
try:
bp = nircam_filter(filt)
except:
bp = miri_filter(filt)
wint = bp.avgwave() / 1e4
x = np.array([np.interp(wint, wvals, row) for row in tbl_arr])
y = table['log(Age/yr)'].data
z = table['Mass/Mearth'].data
zlog = np.log10(z)
#######################################################
# Grab COND model data to fill in higher masses
base_dir = _spec_dir + 'cond_models/'
if cond_file is None:
cond_file = base_dir + 'model.AMES-Cond-2000.M-0.0.JWST.Vega'
npsave_file = cond_file + '.{}.npy'.format(filt)
try:
mag2, age2, mass2_mjup = np.load(npsave_file)
except:
d_tbl2 = cond_table(file=cond_file) # Dictionary of ages
mass2_mjup = []
mag2 = []
age2 = []
for k in d_tbl2.keys():
tbl2 = d_tbl2[k]
mass2_mjup = mass2_mjup + list(tbl2['MJup'].data)
try:
mag2 = mag2 + list(tbl2[filt+'a'].data) # NIRCam
except KeyError:
filt_alt = {'F1065C':'F1000W', 'F1140C':'F1130W', 'F1550C':'F1500W', 'F2300C':'F2100W'}
fcol = filt_alt.get(filt, filt)
mag2 = mag2 + list(tbl2[fcol].data) # MIRI
age2 = age2 + list(np.ones(len(tbl2))*k)
mass2_mjup = np.array(mass2_mjup)
mag2 = np.array(mag2)
age2 = np.array(age2)
mag_age_mass = np.array([mag2,age2,mass2_mjup])
np.save(npsave_file, mag_age_mass)
# Irregular grid
x2 = mag2
y2 = np.log10(age2 * 1e6)
z2 = mass2_mjup * 318 # Convert to Earth masses
zlog2 = np.log10(z2)
#######################################################
xlim = np.array([x2.min(),x.max()+5]) # magntidue limits
ylim = np.array([6,10]) # 10^6 to 10^10 yrs
dx = (xlim[1] - xlim[0]) / 200
dy = (ylim[1] - ylim[0]) / 200
xgrid = np.arange(xlim[0], xlim[1]+dx, dx)
ygrid = np.arange(ylim[0], ylim[1]+dy, dy)
X, Y = np.meshgrid(xgrid, ygrid)
zgrid = griddata((x,y), zlog, (X, Y), method='cubic')
zgrid_cond = griddata((x2,y2), zlog2, (X, Y), method='cubic')
# There will be NaN's along the border that need to be replaced
ind_nan = np.isnan(zgrid)
# First replace with COND grid
zgrid[ind_nan] = zgrid_cond[ind_nan]
ind_nan = np.isnan(zgrid)
# Remove rows/cols with NaN's
xgrid2, ygrid2, zgrid2 = _trim_nan_array(xgrid, ygrid, zgrid)
# Create regular grid interpolator function for extrapolation at NaN's
func = RegularGridInterpolator((ygrid2,xgrid2), zgrid2, method='linear',
bounds_error=False, fill_value=None)
# Fix NaN's in zgrid and rebuild func
pts = np.array([Y[ind_nan], X[ind_nan]]).transpose()
zgrid[ind_nan] = func(pts)
func = RegularGridInterpolator((ygrid,xgrid), zgrid, method='linear',
bounds_error=False, fill_value=None)
# Get mass limits for series of magnitudes at a given age
age_log = np.log10(age*1e6)
mag_abs_arr = xgrid
pts = np.array([(age_log,xval) for xval in mag_abs_arr])
mass_arr = 10**func(pts) / 318.0 # Convert to MJup
# TODO: Rewrite this function to better extrapolate to lower and higher masses
# For now, fit low order polynomial
isort = np.argsort(mag_abs_arr)
mag_abs_arr = mag_abs_arr[isort]
mass_arr = mass_arr[isort]
ind_fit = mag_abs_arr<x.max()
lxmap = [mag_abs_arr.min(), mag_abs_arr.max()]
xfit = np.append(mag_abs_arr[ind_fit], mag_abs_arr[-1])
yfit = np.log10(np.append(mass_arr[ind_fit], mass_arr[-1]))
cf = jl_poly_fit(xfit, yfit, deg=4, use_legendre=False, lxmap=lxmap)
mass_arr = 10**jl_poly(mag_abs_arr,cf)
mag_app_arr = mag_abs_arr + 5*np.log10(dist/10.0)
# Sort by mass
isort = np.argsort(mass_arr)
mass_arr = mass_arr[isort]
mag_app_arr = mag_app_arr[isort]
return mass_arr, mag_app_arr
[docs]def cond_table(age=None, file=None, **kwargs):
"""Load COND Model Table
Function to read in the COND model tables, which have been formatted
in a very specific way. Has the option to return a dictionary of
astropy Tables, where each dictionary element corresponds to
the specific ages within the COND table. Or, if the age keyword is
specified, then this function only returns a single astropy table.
Parameters
----------
age : float
Age in Myr. If set to None, then an array of ages from the file
is used to generate dictionary. If set, chooses the closest age
supplied in table.
file : string
Location and name of COND file. See isochrones stored at
https://phoenix.ens-lyon.fr/Grids/.
Default is model.AMES-Cond-2000.M-0.0.JWST.Vega
"""
def make_table(*args):
i1, i2 = (ind1[i]+4, ind2[i])
rows = []
for line in content[i1:i2]:
if (line=='') or ('---' in line):
continue
else:
vals = np.array(line.split(), dtype='float64')
rows.append(tuple(vals))
tbl = Table(rows=rows, names=cnames)
# Convert to Jupiter masses
newcol = tbl['M/Ms'] * 1047.348644
newcol.name = 'MJup'
tbl.add_column(newcol, index=1)
tbl['MJup'].format = '.2f'
return tbl
# Default file to read and load
if file is None:
base_dir = _spec_dir + 'cond_models/'
file = base_dir + 'model.AMES-Cond-2000.M-0.0.JWST.Vega'
with open(file) as f:
content = f.readlines()
content = [x.strip('\n') for x in content]
# Column names
cnames = content[5].split()
cnames = ['M/Ms', 'Teff'] + cnames[1:]
ncol = len(cnames)
# Create a series of tables for each time
times_gyr = []
ind1 = []
for i, line in enumerate(content):
if 't (Gyr)' in line:
times_gyr.append(line.split()[-1])
ind1.append(i)
ntimes = len(times_gyr)
# Create start and stop indices for each age value
ind2 = ind1[1:] + [len(content)]
ind1 = np.array(ind1)
ind2 = np.array(ind2)-1
# Everything is Gyr, but prefer Myr
ages_gyr = np.array(times_gyr, dtype='float64')
ages_myr = np.array(ages_gyr * 1000, dtype='int')
#times = ['{:.0f}'.format(a) for a in ages_myr]
# Return all tables if no age specified
if age is None:
tables = {}
for i in range(ntimes):
tbl = make_table(i, ind1, ind2, content)
tables[ages_myr[i]] = tbl
return tables
else:
# This is faster if we only want one table
ages_diff = np.abs(ages_myr - age)
i = np.where(ages_diff==ages_diff.min())[0][0]
tbl = make_table(i, ind1, ind2, content)
return tbl
[docs]def cond_filter(table, filt, module='A', dist=None, **kwargs):
"""
Given a COND table and NIRCam filter, return arrays of MJup and Vega mags.
If distance (pc) is provided, then return the apparent magnitude,
otherwise absolute magnitude at 10pc. Input table has already been filtered
by age.
"""
# Table Data
try:
fcol = filt + module.lower()
mag_data = table[fcol].data
except KeyError:
# MIRI coronagraphic filters are incorrect in the AMES-COND files.
# It assumes extra attenuation from the central mask, which gives
# the incorrect flux for off-axis points sources. Instead, use some
# alternate bandpasses
filt_alt = {'F1065C':'F1000W', 'F1140C':'F1130W', 'F1550C':'F1500W', 'F2300C':'F2100W'}
fcol = filt_alt.get(filt, filt)
mag_data = table[fcol].data
mcol = 'MJup'
mass_data = table[mcol].data
# Data to interpolate onto
mass_arr = list(np.arange(0.1,1,0.1)) + list(np.arange(1,10)) \
+ list(np.arange(10,200,10)) + list(np.arange(200,1400,100))
mass_arr = np.array(mass_arr)
# Interpolate
mag_arr = np.interp(mass_arr, mass_data, mag_data)
# Extrapolate
cf = jl_poly_fit(np.log(mass_data), mag_data)
ind_out = (mass_arr < mass_data.min()) | (mass_arr > mass_data.max())
mag_arr[ind_out] = jl_poly(np.log(mass_arr), cf)[ind_out]
# Distance modulus for apparent magnitude
if dist is not None:
mag_arr = mag_arr + 5*np.log10(dist/10)
return mass_arr, mag_arr
def _trim_nan_array(xgrid, ygrid, zgrid):
"""NaN Trimming of Array Image
For an image with a rotated border of NaN's,
remove rows/cols with NaN's while trying to
preserve the maximum footprint of real data.
"""
xgrid2, ygrid2, zgrid2 = xgrid, ygrid, zgrid
# Create a mask of NaN'ed values
nan_mask = np.isnan(zgrid2)
nrows, ncols = nan_mask.shape
# Determine number of NaN's along each row and col
num_nans_cols = nan_mask.sum(axis=0)
num_nans_rows = nan_mask.sum(axis=1)
# First, crop all rows/cols that are only NaN's
xind_good = np.where(num_nans_cols < nrows)[0]
yind_good = np.where(num_nans_rows < ncols)[0]
# get border limits
x1, x2 = (xind_good.min(), xind_good.max()+1)
y1, y2 = (yind_good.min(), yind_good.max()+1)
# Trim of NaN borders
xgrid2 = xgrid2[x1:x2]
ygrid2 = ygrid2[y1:y2]
zgrid2 = zgrid2[y1:y2,x1:x2]
# Find a optimal rectangule subsection free of NaN's
# Iterative cropping
ndiff = 5
while np.isnan(zgrid2.sum()):
# Make sure ndiff is not negative
if ndiff<0:
break
npix = zgrid2.size
# Create a mask of NaN'ed values
nan_mask = np.isnan(zgrid2)
nrows, ncols = nan_mask.shape
# Determine number of NaN's along each row and col
num_nans_cols = nan_mask.sum(axis=0)
num_nans_rows = nan_mask.sum(axis=1)
# Look for any appreciable diff row-to-row/col-to-col
col_diff = num_nans_cols - np.roll(num_nans_cols,-1)
row_diff = num_nans_rows - np.roll(num_nans_rows,-1)
# For edge wrapping, just use last minus previous
col_diff[-1] = col_diff[-2]
row_diff[-1] = row_diff[-2]
# Keep rows/cols composed mostly of real data
# and where number of NaN's don't change dramatically
xind_good = np.where( ( np.abs(col_diff) <= ndiff ) &
( num_nans_cols < 0.5*nrows ) )[0]
yind_good = np.where( ( np.abs(row_diff) <= ndiff ) &
( num_nans_rows < 0.5*ncols ) )[0]
# get border limits
x1, x2 = (xind_good.min(), xind_good.max()+1)
y1, y2 = (yind_good.min(), yind_good.max()+1)
# Trim of NaN borders
xgrid2 = xgrid2[x1:x2]
ygrid2 = ygrid2[y1:y2]
zgrid2 = zgrid2[y1:y2,x1:x2]
# Check for convergence
# If we've converged, reduce
if npix==zgrid2.size:
ndiff -= 1
# Last ditch effort in case there are still NaNs
# If so, remove rows/cols 1 by 1 until no NaNs
while np.isnan(zgrid2.sum()):
xgrid2 = xgrid2[1:-1]
ygrid2 = ygrid2[1:-1]
zgrid2 = zgrid2[1:-1,1:-1]
return xgrid2, ygrid2, zgrid2
[docs]def companion_spec(bandpass, model='SB12', atmo='hy3s', mass=10, age=100, entropy=10,
dist=10, accr=False, mmdot=None, mdot=None, accr_rin=2, truncated=False,
sptype=None, renorm_args=None, Av=0, **kwargs):
""" Determine flux (ph/sec) of a companion
Add exoplanet information that will be used to generate a point
source image using a spectrum from Spiegel & Burrows (2012).
Coordinate convention is for +N up and +E to left.
Parameters
----------
bandpass : :mod:`pysynphot.obsbandpass`
A Pysynphot bandpass object.
model : str
Exoplanet model to use ('sb12', 'bex', 'cond') or
stellar spectrum model ('bosz', 'ck04models', 'phoenix').
atmo : str
A string consisting of one of four atmosphere types:
['hy1s', 'hy3s', 'cf1s', 'cf3s'].
mass: int
Number 1 to 15 Jupiter masses.
age: float
Age in millions of years (1-1000).
entropy: float
Initial entropy (8.0-13.0) in increments of 0.25
sptype : str
Instead of using a exoplanet spectrum, specify a stellar type.
renorm_args : dict
Pysynphot renormalization arguments in case you want
very specific luminosity in some bandpass.
Includes (value, units, bandpass).
dist : float
Distance in pc.
Av : float
Extinction magnitude (assumes Rv=4.0) of the companion
(e.g., due to being embedded in a disk).
accr : bool
Include accretion? default: False
mmdot : float
From Zhu et al. (2015), the Mjup^2/yr value.
If set to None then calculated from age and mass.
mdot : float
Or use mdot (Mjup/yr) instead of mmdot.
accr_rin : float
Inner radius of accretion disk (units of RJup; default: 2)
truncated: bool
Full disk or truncated (ie., MRI; default: False).
"""
# Spiegel & Burrows model already have a class
# For BEX and COND models, make
if model.lower() in ['sb12', 'bex', 'cond']:
calc_accr = False if model.lower() in ['bex', 'cond'] else accr
pl = {
'atmo': atmo, 'mass': mass, 'age': age,
'entropy': entropy, 'distance': dist,
'accr': calc_accr, 'mmdot': mmdot, 'mdot': mdot,
'accr_rin': accr_rin, 'truncated': truncated
}
planet = planets_sb12(**pl)
sp = planet.export_pysynphot()
# Check spectral overlap
sp_overlap = S.observation.check_overlap(bandpass, sp)
# Ensure there is a data point at the edge of the input bandpass
if sp_overlap != 'full':
w_end = np.max(bandpass.wave)
f_end = sp.sample(w_end)
w_new = np.append(sp.wave, w_end)
f_new = np.append(sp.flux, f_end)
sp = S.ArraySpectrum(w_new, f_new, waveunits=sp.waveunits, fluxunits=sp.fluxunits)
del_mag = 0
# Add accretion mag offsets for BEX and COND models
if (model.lower() in ['bex', 'cond']) and (accr==True):
if sp_overlap != 'full':
_log.warn(f"Overlap between spectrum and bandpass: {sp_overlap}.")
_log.warn("Accretion calculation may be unreliable.")
pl = {
'atmo': atmo, 'mass': mass, 'age': age,
'entropy': entropy, 'distance': dist,
'accr': True, 'mmdot': mmdot, 'mdot': mdot,
'accr_rin': accr_rin, 'truncated': truncated
}
planet = planets_sb12(**pl)
# Get spectrum from accretion component
sp_mdot = sp_accr(planet.mmdot, rin=planet.rin,
dist=planet.distance, truncated=planet.truncated,
waveout=sp.waveunits, fluxout=sp.fluxunits)
# Interpolate accretion spectrum at each wavelength
fnew = np.interp(sp.wave, sp_mdot.wave, sp_mdot.flux)
sp_new = S.ArraySpectrum(sp.wave, fnew, waveunits=sp.waveunits,
fluxunits=sp.fluxunits)
obs_accr = S.Observation(sp_new, bandpass, binset=bandpass.wave)
del_mag -= obs_accr.effstim('vegamag')
# Make new spectrum
sp = planet.export_pysynphot()
# Add extinction from the disk
if Av>0:
Rv = 4.0
sp_ext = sp * S.Extinction(Av/Rv, name='mwrv4')
if model.lower() in ['bex', 'cond']:
if sp_overlap != 'full':
_log.warn(f"Overlap between spectrum and bandpass: {sp_overlap}.")
_log.warn("Extinction calculation may be unreliable.")
obs = S.Observation(sp, bandpass, binset=bandpass.wave)
obs_ext = S.Observation(sp_ext, bandpass, binset=bandpass.wave)
del_mag += obs_ext.effstim('vegamag') - obs.effstim('vegamag')
sp = sp_ext
# For BEX and COND models, set up renorm_args
# unless renorm_args is already set
if (renorm_args is not None) and (len(renorm_args) > 0):
pass
elif model.lower()=='bex':
table = linder_table()
mass_arr, mag_arr = linder_filter(table, bandpass.name, age, dist=dist)
mag = np.interp(mass, mass_arr, mag_arr)
mag += del_mag # Apply extinction and/or accretion offsets
renorm_args = (mag, 'vegamag', bandpass)
elif model.lower()=='cond':
table = cond_table(age)
mass_arr, mag_arr = cond_filter(table, bandpass.name, dist=dist)
mag = np.interp(mass, mass_arr, mag_arr)
mag += del_mag # Apply extinction and/or accretion offsets
renorm_args = (mag, 'vegamag', bandpass)
# Renormalize to some specified flux in a given bandpass
if (renorm_args is not None) and (len(renorm_args) > 0):
sp_norm = sp.renorm(*renorm_args, force=True)
sp = sp_norm
elif sp_overlap != 'full':
_log.warn(f"Overlap between spectrum and bandpass: {sp_overlap}.")
_log.warn("Recommend supplying renorm_args input.")
elif model.lower() in ['bosz', 'ck04models', 'phoenix']:
pl = {'sptype': sptype, 'Av': Av, 'renorm_args': renorm_args}
sp = stellar_spectrum(sptype)
if Av>0:
Rv = 4.0
sp *= S.Extinction(Av/Rv, name='mwrv4')
if (renorm_args is not None) and (len(renorm_args) > 0):
sp_norm = sp.renorm(*renorm_args, force=True)
sp = sp_norm
name = kwargs.get('name')
if name is not None:
sp.name = name
return sp
[docs]def bin_spectrum(sp, wave, waveunits='um'):
"""Rebin spectrum
Rebin a :mod:`pysynphot.spectrum` to a different wavelength grid.
This function first converts the input spectrum to units
of counts then combines the photon flux onto the
specified wavelength grid.
Output spectrum units are the same as the input spectrum.
Parameters
-----------
sp : :mod:`pysynphot.spectrum`
Spectrum to rebin.
wave : array_like
Wavelength grid to rebin onto.
waveunits : str
Units of wave input. Must be recognizeable by Pysynphot.
Returns
-------
:mod:`pysynphot.spectrum`
Rebinned spectrum in same units as input spectrum.
"""
waveunits0 = sp.waveunits
fluxunits0 = sp.fluxunits
# Convert wavelength of input spectrum to desired output units
sp.convert(waveunits)
# We also want input to be in terms of counts to conserve flux
sp.convert('flam')
edges = S.binning.calculate_bin_edges(wave)
ind = (sp.wave >= edges[0]) & (sp.wave <= edges[-1])
binflux = binned_statistic(sp.wave[ind], sp.flux[ind], np.mean, bins=edges)
# Interpolate over NaNs
ind_nan = np.isnan(binflux)
finterp = interp1d(wave[~ind_nan], binflux[~ind_nan], kind='cubic')
binflux[ind_nan] = finterp(wave[ind_nan])
sp2 = S.ArraySpectrum(wave, binflux, waveunits=waveunits, fluxunits='flam')
sp2.convert(waveunits0)
sp2.convert(fluxunits0)
# Put back units of original input spectrum
sp.convert(waveunits0)
sp.convert(fluxunits0)
return sp2
[docs]def mag_to_counts(src_mag, bandpass, sp_type='G0V', mag_units='vegamag', **kwargs):
"""
Convert stellar magnitudes in some bandpass to corresponding flux values (e-/sec)
"""
# Get flux of a 0 magnitude star (zero-point flux)
sp = stellar_spectrum(sp_type, 0, mag_units, bandpass)
obs = S.Observation(sp, bandpass, binset=bandpass.wave)
zp_counts = obs.effstim('counts') # Counts of a 0 mag star
# Flux of each star e-/sec
src_flux = np.array(zp_counts * 10**(-src_mag / 2.5))
return src_flux