Source code for webbpsf_ext.spectra

from pathlib import Path
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

import os, re

from astropy.io import fits, ascii
from astropy.table import Table
import astropy.units as u

from scipy.interpolate import griddata, RegularGridInterpolator, interp1d

from . import conf
from . import synphot_ext as s_ext
from .bandpasses import miri_filter, nircam_filter
from .robust import medabsdev

import logging
_log = logging.getLogger('webbpsf_ext')

from . import __path__
_spec_dir = Path(__path__[0]) / 'spectral_data/'

[docs] def BOSZ_filename(Teff, metallicity, log_g, res, carbon=0, alpha=0): """ Generate filename for BOSZ spectrum. """ teff_str = f't{Teff:04.0f}' logg_str = 'g{:02.0f}'.format(int(log_g*10)) # Metallicity [M/H] metal_str = 'mp{:02.0f}'.format(int(abs(metallicity*10)+0.5)) if metallicity<0: metal_str = metal_str.replace('p', 'm') # Carbon abundance [C/M] carb_str = 'cp{:02.0f}'.format(int(abs(carbon*10)+0.5)) if carbon<0: carb_str = carb_str.replace('p', 'm') # alpha-element value [alpha/H] alpha_str = 'op{:02.0f}'.format(int(abs(alpha*10)+0.5)) if alpha<0: alpha_str = alpha_str.replace('p', 'm') # Resolution rstr = 'b{}'.format(res) # Final file name fname = f'a{metal_str}{carb_str}{alpha_str}{teff_str}{logg_str}v20modrt0{rstr}rs.fits' return fname
[docs] def download_BOSZ_spectrum(Teff, metallicity, log_g, res, carbon=0, alpha=0, outdir=None): import requests if outdir is None: res_dir = os.path.join(_spec_dir, 'bosz_grids', 'R{}'.format(res)) # Create resolution directory if it doesn't exists if not os.path.isdir(res_dir): os.makedirs(res_dir) outdir = res_dir # Generate URL directory that file is saved in url_base = 'http://archive.stsci.edu/missions/hlsp/bosz/fits/' res_str = f'insbroad_{res:06.0f}' metal_str = f'metal_{metallicity:+.2f}' carbon_str = f'carbon_{carbon:+.2f}' alpha_str = f'alpha_{alpha:+.2f}' url_dir = os.path.join(url_base,res_str,metal_str,carbon_str,alpha_str) # Generate file name fname = BOSZ_filename(Teff, metallicity, log_g, res, carbon=carbon, alpha=alpha) # Final URL url_final = os.path.join(url_dir, fname) # Make request _log.info(f'Downloading file: {fname}') req = requests.get(url_final, allow_redirects=True) # Raise exception if file not found or other HTTP error if req.status_code != requests.codes.ok: req.raise_for_status() # Save file to directory outpath = os.path.join(outdir, fname) _log.info(f'Saving file to: {outpath}') open(outpath, 'wb').write(req.content)
[docs] def BOSZ_spectrum(Teff, metallicity, log_g, res=2000, interpolate=True, carbon=0, alpha=0, fluxout='photlam', **kwargs): """BOSZ stellar atmospheres (Bohlin et al 2017). Read in a spectrum from the BOSZ stellar atmosphere models database. Returns a synphot spectral object. Wavelength values range between 1000 Angstroms to 32 microns. Teff range from 3500K to 36000K. This function interpolates the model grid by reading in those models closest in temperature, metallicity, and log g to the desired parameters, then takes the weighted average of these models based on their relative offsets. Can also just read in the closest model by setting interpolate=False. Different spectral resolutions can also be specified. Parameters ---------- Teff : float Effective temperature ranging from 3500K to 30000K. metallicity : float Metallicity [Fe/H] value ranging from -2.5 to 0.5. log_g : float Surface gravity (log g) from 0 to 5. Keyword Args ------------ carbon : float Carbon abundance [C/M]. Must be either [-0.75,-0.5,-0.25, 0, 0.25, 0.5]. alpha : float alpha-element value [alpha/H]. Must be either [-0.25, 0, 0.25, 0.5] res : str Spectral resolution to use (instrument broadening). Valid points: [200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 300000] interpolate : bool Interpolate spectrum using a weighted average of grid points surrounding the desired input parameters. References ---------- https://archive.stsci.edu/prepds/bosz/ """ model_dir = os.path.join(_spec_dir, 'bosz_grids/') res_dir = os.path.join(model_dir, f'R{res:.0f}/') if not os.path.isdir(_spec_dir): raise IOError(f'Spectral directory does not exist: {_spec_dir}') if not os.path.isdir(res_dir): os.makedirs(res_dir) # raise IOError('Resolution directory does not exist: {}'.format(res_dir)) # Grid of computed temperature steps teff_grid = list(range(3500,12000,250)) \ + list(range(12000,20000,500)) \ + list(range(20000,36000,1000)) teff_grid = np.array(teff_grid) # Grid of log g steps for desired Teff lg_max = 5 lg_step = 0.5 if Teff <= 6000: lg_min = 0 elif Teff <= 8000: lg_min = 1 elif Teff <= 12000: lg_min = 2 elif Teff <= 30000: lg_min = 3 elif Teff <= 35000: lg_min = 3.5 else: raise ValueError('Teff must be less than or equal to 35000K.') if log_g<lg_min: raise ValueError('log_g must be >={}'.format(lg_min)) if log_g>lg_max: raise ValueError('log_g must be <={}'.format(lg_max)) # Grid of log g values logg_grid = np.arange(lg_min, lg_max+lg_step, lg_step) # Grid of metallicity values metal_grid = np.arange(-2.5,0.75,0.25) # First, choose the two grid points closest in Teff teff_diff = np.abs(teff_grid - Teff) ind_sort = np.argsort(teff_diff) if teff_diff[ind_sort[0]]==0: # Exact teff_best = np.array([teff_grid[ind_sort[0]]]) else: # Want to interpolate teff_best = teff_grid[ind_sort[0:2]] # Choose the two best log g values logg_diff = np.abs(logg_grid - log_g) ind_sort = np.argsort(logg_diff) if logg_diff[ind_sort[0]]==0: # Exact logg_best = np.array([logg_grid[ind_sort[0]]]) else: # Want to interpolate logg_best = logg_grid[ind_sort[0:2]] # Choose the two best metallicity values metal_diff = np.abs(metal_grid - metallicity) ind_sort = np.argsort(metal_diff) if metal_diff[ind_sort[0]]==0: # Exact metal_best = np.array([metal_grid[ind_sort[0]]]) else: # Want to interpolate metal_best = metal_grid[ind_sort[0:2]] # Build final file names fnames = [] # Build lists of properties to pass to download function if needed teff_all = [] logg_all = [] metal_all = [] for t in teff_best: for l in logg_best: for m in metal_best: fname = BOSZ_filename(t, m, l, res, carbon=carbon, alpha=alpha) fnames.append(fname) teff_all.append(t) logg_all.append(l) metal_all.append(m) teff_all = np.array(teff_all) logg_all = np.array(logg_all) metal_all = np.array(metal_all) # Weight by relative distance from desired value weights = [] teff_diff = np.abs(teff_best - Teff) logg_diff = np.abs(logg_best - log_g) metal_diff = np.abs(metal_best - metallicity) for t in teff_diff: wt = 1 if len(teff_diff)==1 else t / np.sum(teff_diff) for l in logg_diff: wl = 1 if len(logg_diff)==1 else l / np.sum(logg_diff) for m in metal_diff: wm = 1 if len(metal_diff)==1 else m / np.sum(metal_diff) weights.append(wt*wl*wm) weights = np.array(weights) weights = weights / np.sum(weights) if interpolate: wave_all = [] flux_all = [] for i, f in enumerate(fnames): # Download files that don't currently exist fpath = os.path.join(res_dir, f) if not os.path.isfile(fpath): download_BOSZ_spectrum(teff_all[i], metal_all[i], logg_all[i], res, carbon=carbon, alpha=alpha) d = fits.getdata(fpath, 1) wave_all.append(d['Wavelength']) flux_all.append(d['SpecificIntensity'] * weights[i]) wfin = wave_all[0] ffin = np.pi * np.array(flux_all).sum(axis=0) # erg/s/cm^2/A else: ind = np.where(weights==weights.max())[0][0] f = fnames[ind] Teff = teff_all[ind] log_g = logg_all[ind] metallicity = metal_all[ind] # Download files if doesn't exist fpath = os.path.join(res_dir, f) if not os.path.isfile(fpath): download_BOSZ_spectrum(Teff, metallicity, log_g, res, carbon=carbon, alpha=alpha) d = fits.getdata(fpath, 1) wfin = d['Wavelength'] ffin = np.pi * d['SpecificIntensity'] # erg/s/cm^2/A name = 'BOSZ(Teff={},z={},logG={})'.format(Teff, metallicity, log_g) sp = s_ext.ArraySpectrum(wave=wfin[:-1], flux=ffin[:-1], waveunits='angstrom', fluxunits='flam', name=name) sp.convert(fluxout) return sp
[docs] def BOSZ_filename_2024(Teff, log_g, metallicity, res, alpha=0, carbon=0, micro_vel=2, prod=None, **kwargs): """ Generate filename for BOSZ 2024 spectrum MARCS ("ms" or "mp"): * Teff from 2800 to 4000, in steps of 100; log(g) -0.5 to 5.5, in steps of 0.5 * Teff from 4250 to 4750, in steps of 250; log(g) -0.5 to 5.0, in steps of 0.5 * Teff from 5000 to 5750, in steps of 250; log(g) 0.0 to 5.5, in steps of 0.5 * Teff from 6000 to 7000, in steps of 250; log(g) 1.0 to 5.5, in steps of 0.5 * Teff from 7250 to 8000, in steps of 250; log(g) 2.0 to 5.5, in steps of 0.5 ATLAS9 ("ap"): * Teff from 7500 to 12000, in steps of 250; log(g) 2.0 to 5.0, in steps of 0.5 * Teff from 12500 to 16000, in steps of 500; log(g) 3.0 to 5.0, in steps of 0.5 Parameters ---------- Teff : float Effective temperature ranging from 2800K to 16000K. log_g : float Surface gravity (log g) from -0.5 to 5.5. metallicity : float Metallicity [Fe/H] value ranging from -2.50 to +0.75, in steps of 0.25 res : str Spectral resolution: 500, 1000, 2000, 5000, 10000, 20000, 50000, or 'orig'. Keyword Args ------------ alpha : float alpha-element value [alpha/M]. Must be either [-0.25, 0, 0.25, 0.5] carbon : float Carbon abundance [C/M]. Must be either [-0.75,-0.5,-0.25, 0, 0.25, 0.5]. micro_vel : float microturbulence velocity, either 0, 1, 2, or 4 km/s prod : str Product type is either None, 'wave', or 'lineid'. If None, the default is either 'resam' or 'noresamp' if res='orig'. If 'wave', the product is the wavelength file 'bosz2024_wave_r{res}.txt'. If 'lineid', the product is the line identification file, only valid for res='orig'. """ # Effective Temperature teff_str = f't{Teff:04.0f}' # Surface gravity logg_str = f'g{log_g:+02.1f}' # In the MARCS model atmospheres, spherical geometry is used between # logg=−1 and 3 with vmicro=2 kms−1; and plane-parallel geometry is # assumed between logg= 3.5 and 5.5 dex with vmicro=1 kms−1. # All ATLAS9 models are plane-parallel model atmospheres with vmicro=2 kms−1. is_marcs = Teff <= 8000 if is_marcs: atmo_str = 'ms' if log_g < 3.5 else 'mp' else: atmo_str = 'ap' # Metallicity [M/H] metal_str = f'm{metallicity:+03.2f}' # alpha-element value [alpha/M] alpha_str = f'a{alpha:+03.2f}' # Carbon abundance [C/M] carb_str = f'c{carbon:+03.2f}' # Microturbulence micro_str = f'v{micro_vel:01.0f}' # Resolution # res can also equal 'orig' for the original resolution rstr = f'r{res}' # Product type is either 'resam' or 'noresam' if prod is None: prod_str = 'noresam' if res == 'orig' else 'resam' elif prod=='lineid' and res=='orig': prod_str = 'lineid' elif prod=='lineid': raise ValueError('Product type "lineid" is only valid for res="orig".') elif (prod == 'wave') and (res != 'orig'): return f'bosz2024_wave_r{res}.txt' elif (prod == 'wave'): raise ValueError('Product type "wave" is not valid for res="orig".') else: raise ValueError(f'Invalid product type: {prod}') # Final file name # bosz2024_mp_t5000_g+5.0_m+0.00_a+0.00_c+0.00_v0_r500_resam.txt.gz # bosz2024_<atmos>_<teff>_<logg>_<metal>_<alpha>_<carbon>_<micro>_<insbroad>_<prod>.txt.gz fname = f'bosz2024_{atmo_str}_{teff_str}_{logg_str}_{metal_str}_{alpha_str}_{carb_str}_{micro_str}_{rstr}_{prod_str}.txt.gz' return fname
[docs] def download_BOSZ_2024(Teff, log_g, metallicity, res, outdir=None, **kwargs): import requests if outdir is None: res_dir = os.path.join(_spec_dir, 'bosz2024_grids', 'R{}'.format(res)) # Create resolution directory if it doesn't exists if not os.path.isdir(res_dir): os.makedirs(res_dir) outdir = res_dir # Check for product keyword of type is is_wave = kwargs.get('prod', 'none') == 'wave' # Generate URL directory that file is saved in url_base = 'https://archive.stsci.edu/hlsps/bosz/bosz2024/' res_str = f'r{res}' metal_str = f'm{metallicity:+03.2f}' if is_wave: url_dir = os.path.join(url_base, 'wavelength_grids') else: url_dir = os.path.join(url_base, res_str, metal_str) # Generate file name fname = BOSZ_filename_2024(Teff, log_g, metallicity, res, **kwargs) # Final URL url_final = os.path.join(url_dir, fname) # Make request _log.info(f'Downloading file: {fname}') req = requests.get(url_final, allow_redirects=True) # Raise exception if file not found or other HTTP error if req.status_code != requests.codes.ok: req.raise_for_status() # Save file to directory outpath = os.path.join(outdir, fname) _log.info(f'Saving file to: {outpath}') open(outpath, 'wb').write(req.content)
[docs] def BOSZ_2024_spectrum(Teff, log_g, metallicity, res=2000, interpolate=True, fluxout='photlam', **kwargs): """BOSZ stellar atmospheres (Mészáros et al. 2024). Read in a spectrum from the BOSZ stellar atmosphere models database. Returns a synphot spectral object. Wavelength values range between 1000 Angstroms to 32 microns. Teff range from 3500K to 36000K. This function interpolates the model grid by reading in those models closest in temperature, metallicity, and log g to the desired parameters, then takes the weighted average of these models based on their relative offsets. Can also just read in the closest model by setting interpolate=False. Different spectral resolutions can also be specified. Parameters ---------- Teff : float Effective temperature ranging from 3500K to 30000K. metallicity : float Metallicity [Fe/H] value ranging from -2.5 to 0.5. log_g : float Surface gravity (log g) from 0 to 5. Keyword Args ------------ carbon : float Carbon abundance [C/M]. Must be either [-0.75,-0.5,-0.25, 0, 0.25, 0.5]. alpha : float alpha-element value [alpha/H]. Must be either [-0.25, 0, 0.25, 0.5] res : str Spectral resolution to use (instrument broadening). Valid points: [200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 300000] interpolate : bool Interpolate spectrum using a weighted average of grid points surrounding the desired input parameters. References ---------- https://archive.stsci.edu/prepds/bosz/ """ model_dir = os.path.join(_spec_dir, 'bosz2024_grids/') res_dir = os.path.join(model_dir, f'R{res:.0f}/') if not os.path.isdir(_spec_dir): raise IOError(f'Spectral directory does not exist: {_spec_dir}') if not os.path.isdir(res_dir): os.makedirs(res_dir) # raise IOError('Resolution directory does not exist: {}'.format(res_dir)) # Grid of computed temperature steps teff_grid = list(range(2800,4000,100)) \ + list(range(4000,12000,250)) \ + list(range(12000,16500,500)) teff_grid = np.array(teff_grid) # Grid of log g steps for desired Teff lg_step = 0.5 if Teff <= 4000: lg_min, lg_max = (-0.5, 5.5) elif Teff <= 4650: lg_min, lg_max = (-0.5, 5.0) elif Teff <= 5750: lg_min, lg_max = ( 0.0, 5.5) elif Teff <= 7000: lg_min, lg_max = ( 1.0, 5.5) elif Teff <= 8000: lg_min, lg_max = ( 2.0, 5.5) elif Teff <= 12000: lg_min, lg_max = ( 2.0, 5.0) elif Teff <= 16000: lg_min, lg_max = ( 3.0, 5.0) else: raise ValueError('Teff must be less than or equal to 16000K.') if log_g<lg_min: raise ValueError(f'log_g must be >= {lg_min:+.1f}') if log_g>lg_max: raise ValueError(f'log_g must be <= {lg_max:+.1f}') # Grid of log g values logg_grid = np.arange(lg_min, lg_max+lg_step, lg_step) # Grid of metallicity values metal_grid = np.arange(-2.5,0.75,0.25) # First, choose the two grid points closest in Teff teff_diff = np.abs(teff_grid - Teff) ind_sort = np.argsort(teff_diff) if teff_diff[ind_sort[0]]==0: # Exact teff_best = np.array([teff_grid[ind_sort[0]]]) else: # Want to interpolate teff_best = teff_grid[ind_sort[0:2]] # Choose the two best log g values logg_diff = np.abs(logg_grid - log_g) ind_sort = np.argsort(logg_diff) if logg_diff[ind_sort[0]]==0: # Exact logg_best = np.array([logg_grid[ind_sort[0]]]) else: # Want to interpolate logg_best = logg_grid[ind_sort[0:2]] # Choose the two best metallicity values metal_diff = np.abs(metal_grid - metallicity) ind_sort = np.argsort(metal_diff) if metal_diff[ind_sort[0]]==0: # Exact metal_best = np.array([metal_grid[ind_sort[0]]]) else: # Want to interpolate metal_best = metal_grid[ind_sort[0:2]] # Get wavelength file if res != 'orig': fname_wave = BOSZ_filename_2024(Teff, log_g, metallicity, res, prod='wave') fpath_wave = os.path.join(res_dir, fname_wave) if not os.path.isfile(fpath_wave): download_BOSZ_2024(Teff, log_g, metallicity, res, prod='wave', outdir=res_dir) tbl_wave = ascii.read(fpath_wave, names=['Wavelength'], format='basic') colnames = ['SpecificIntensity', 'Continuum'] else: colnames = ['Wavelength', 'SpecificIntensity', 'Continuum'] # Build final file names fnames = [] # Build lists of properties to pass to download function if needed teff_all = [] logg_all = [] metal_all = [] for t in teff_best: for l in logg_best: for m in metal_best: fname = BOSZ_filename_2024(t, l, m, res, **kwargs) fnames.append(fname) teff_all.append(t) logg_all.append(l) metal_all.append(m) teff_all = np.array(teff_all) logg_all = np.array(logg_all) metal_all = np.array(metal_all) # Weight by relative distance from desired value weights = [] teff_diff = np.abs(teff_best - Teff) logg_diff = np.abs(logg_best - log_g) metal_diff = np.abs(metal_best - metallicity) for t in teff_diff: wt = 1 if len(teff_diff)==1 else t / np.sum(teff_diff) for l in logg_diff: wl = 1 if len(logg_diff)==1 else l / np.sum(logg_diff) for m in metal_diff: wm = 1 if len(metal_diff)==1 else m / np.sum(metal_diff) weights.append(wt*wl*wm) weights = np.array(weights) weights = weights / np.sum(weights) if interpolate: wave_all = [] flux_all = [] for i, f in enumerate(fnames): # Download files that don't currently exist fpath = os.path.join(res_dir, f) if not os.path.isfile(fpath): download_BOSZ_2024(teff_all[i], logg_all[i], metal_all[i], res, outdir=res_dir, **kwargs) tbl = ascii.read(fpath, names=colnames, format='basic') if res == 'orig': wave_all.append(tbl['Wavelength']) flux_all.append(tbl['SpecificIntensity'] * weights[i]) wfin = wave_all[0] if len(wave_all)>0 else tbl_wave['Wavelength'] ffin = 4 * np.pi * np.array(flux_all).sum(axis=0) # erg/s/cm^2/A else: ind = np.where(weights==weights.max())[0][0] f = fnames[ind] Teff = teff_all[ind] log_g = logg_all[ind] metallicity = metal_all[ind] # Download files if doesn't exist fpath = os.path.join(res_dir, f) if not os.path.isfile(fpath): download_BOSZ_2024(Teff, log_g, metallicity, res, outdir=res_dir, **kwargs) tbl = ascii.read(fpath, names=colnames, format='basic') wfin = tbl['Wavelength'] if res == 'orig' else tbl_wave['Wavelength'] ffin = 4 * np.pi * tbl['SpecificIntensity'] # erg/s/cm^2/A # d = fits.getdata(fpath, 1) # wfin = d['Wavelength'] # ffin = np.pi * d['SpecificIntensity'] # erg/s/cm^2/A name = 'BOSZ(Teff={},z={},logG={})'.format(Teff, metallicity, log_g) sp = s_ext.ArraySpectrum(wave=wfin[:-1], flux=ffin[:-1], waveunits='angstrom', fluxunits='flam', name=name) sp.convert(fluxout) return sp
# TABLE 2: Suggested models for specific stellar types # Type T_{eff} log_g Kurucz model # O3V 44852 +3.92 ckp00_45000[g45] # O5V 40862 +3.92 ckp00_41000[g45] # O5.5V 39865 +3.92 ckp00_40000[g40] # O6V 38867 +3.92 ckp00_39000[g40] # O6.5V 37870 +3.92 ckp00_38000[g40] # O7V 36872 +3.92 ckp00_37000[g40] # O7.5V 35874 +3.92 ckp00_36000[g40] # O8V 34877 +3.92 ckp00_35000[g40] # O8.5 33879 +3.92 ckp00_34000[g40] # O9V 32882 +3.92 ckp00_33000[g40] # O9.5 31884 +3.92 ckp00_32000[g40] # B0V 30000 +3.90 ckp00_30000[g40] # B1V 25400 +3.90 ckp00_25000[g40] # B3V 18700 +3.94 ckp00_19000[g40] # B5V 15400 +4.04 ckp00_15000[g40] # B8V 11900 +4.04 ckp00_12000[g40] # A0V 9520 +4.14 ckp00_9500[g40] # A1V 9230 +4.10 ckp00_9250[g40] # A3V 8270 +4.20 ckp00_8750[g40] # A5V 8200 +4.29 ckp00_8250[g40] # F0V 7200 +4.34 ckp00_7250[g40] # F2V 6890 +4.34 ckp00_7000[g40] # F5V 6440 +4.34 ckp00_6500[g40] # F8V 6200 +4.40 ckp00_6250[g40] # G0V 6030 +4.39 ckp00_6000[g45] # G2V 5860 +4.40 ckp00_5750[g45] # G8V 5570 +4.50 ckp00_5500[g45] # K0V 5250 +4.49 ckp00_5250[g45] # K2V 4780 +4.5 ckp00_4750[g45] # K4V 4560 +4.5 ckp00_4500[g45] # K5V 4350 +4.54 ckp00_4250[g45] # K7V 4060 +4.5 ckp00_4000[g45] # M0V 3850 +4.59 ckp00_3750[g45] # M2V 3580 +4.64 ckp00_3500[g45] # M6V 3050 +5.00 ckp00_3500[g50] # B0III 29000 +3.34 ckp00_29000[g35] # B5III 15000 +3.49 ckp00_15000[g35] # G0III 5850 +2.94 ckp00_5750[g30] # G5III 5150 +2.54 ckp00_5250[g25] # K0III 4750 +2.14 ckp00_4750[g20] # K5III 3950 +1.74 ckp00_4000[g15] # M0III 3800 +1.34 ckp00_3750[g15] # BOI 26000 +2.84 ckp00_26000[g30] # B5I 13600 +2.44 ckp00_14000[g25] # AOI 9730 +2.14 ckp00_9750[g20] # A5I 8510 +2.04 ckp00_8500[g20] # F0I 7700 +1.74 ckp00_7750[g20] # F5I 6900 +1.44 ckp00_7000[g15] # G0I 5550 +1.34 ckp00_5500[g10] # G5I 4850 +1.14 ckp00_4750[g10] # K0I 4420 +0.94 ckp00_4500[g10] # K5I 3850 +0.00 ckp00_3750[g00] # M0I 3650 -0.10 ckp00_3750[g00] # M2I 3600 -0.10 ckp00_3500[g00]
[docs] def stellar_spectrum(sptype, *renorm_args, **kwargs): """Stellar spectrum Similar to specFromSpectralType() in STPSF/Poppy, this function uses a dictionary of fiducial values to determine an appropriate spectral model. If the input spectral type is not found, this function interpolates the effective temperature, metallicity, and log g values . You can also specify renormalization arguments to pass to ``sp.renorm()``. The order (after ``sptype``) should be (``value, units, bandpass``): >>> sp = stellar_spectrum('G2V', 10, 'vegamag', bp) Flat spectrum (in photlam) are also allowed via the 'flat' string. Use ``catname='bosz'`` for BOSZ stellar atmosphere (ATLAS9) (default) Use ``catname='ck04models'`` keyword for ck04 models Use ``catname='phoenix'`` keyword for Phoenix models Keywords exist to directly specify Teff, metallicity, an log_g rather than a spectral type. Parameters ---------- sptype : str Spectral type, such as 'A0V' or 'K2III'. renorm_args : tuple Renormalization arguments to pass to ``sp.renorm()``. The order (after ``sptype``) should be (``value, units, bandpass``) Bandpass should be a :class:`s_ext.Bandpass` type. Keyword Args ------------ catname : str Catalog name, including 'bosz', 'ck04models', and 'phoenix'. Default is 'bosz', which comes from :func:`BOSZ_spectrum`. Teff : float Effective temperature ranging from 2800K to 35000K. metallicity : float Metallicity [Fe/H] value ranging from -2.5 to 0.5. log_g : float Surface gravity (log g) from 0 to 5. res : str BOSZ spectral resolution to use (200 or 2000 or 20000). Default: 2000. interpolate : bool Interpolate BOSZ spectrum using a weighted average of grid points surrounding the desired input parameters. Default: True """ def call_bosz(v0, v1, v2, use_2024=True, **kwargs): # Use earlier version if Teff > 16000 if v0 > 16000 and use_2024: use_2024=False if use_2024: if v0 < 2800: v0 = 2800 _log.warning("BOSZ models start at 2800K. Setting Teff=2800.") return BOSZ_2024_spectrum(v0, v2, v1, **kwargs) else: if v0 > 35000: v0 = 35000 _log.warning("BOSZ models stop at 35000K. Setting Teff=35000.") if v0 < 3500: v0 = 3500 _log.warning("BOSZ models start at 3500K. Setting Teff=3500.") return BOSZ_spectrum(v0, v1, v2, **kwargs) Teff = kwargs.pop('Teff', None) metallicity = kwargs.pop('metallicity', None) log_g = kwargs.pop('log_g', None) catname = kwargs.get('catname', 'bosz') lookuptable = { # https://www.pas.rochester.edu/~emamajek/EEM_dwarf_UBVIJHK_colors_Teff.txt "O0V": (50000, 0.0, 4.0), # Bracketing for interpolation "O3V": (46000, 0.0, 4.0), "O5V": (43000, 0.0, 4.5), "O7V": (36500, 0.0, 4.0), "O9V": (32500, 0.0, 4.0), "B0V": (31500, 0.0, 4.0), "B1V": (26000, 0.0, 4.0), "B3V": (17000, 0.0, 4.0), "B5V": (15700, 0.0, 4.0), "B8V": (12500, 0.0, 4.0), "A0V": (9700, 0.0, 4.0), "A1V": (9200, 0.0, 4.0), "A3V": (8550, 0.0, 4.0), "A5V": (8080, 0.0, 4.0), "F0V": (7220, 0.0, 4.0), "F2V": (6810, 0.0, 4.0), "F5V": (6510, 0.0, 4.0), "F8V": (6170, 0.0, 4.5), "G0V": (5920, 0.0, 4.5), "G2V": (5770, 0.0, 4.5), "G5V": (5660, 0.0, 4.5), "G8V": (5490, 0.0, 4.5), "K0V": (5280, 0.0, 4.5), "K2V": (5040, 0.0, 4.5), "K5V": (4410, 0.0, 4.5), "K7V": (4070, 0.0, 4.5), "M0V": (3870, 0.0, 4.5), "M2V": (3550, 0.0, 4.5), "M5V": (3030, 0.0, 5.0), "M9V": (2400, 0.0, 5.0), # Bracketing for interpolation "O0IV": (50000, 0.0, 3.8), # Bracketing for interpolation "B0IV": (30000, 0.0, 3.8), "B8IV": (12000, 0.0, 3.8), "A0IV": (9500, 0.0, 3.8), "A5IV": (8250, 0.0, 3.8), "F0IV": (7250, 0.0, 3.8), "F8IV": (6250, 0.0, 4.3), "G0IV": (6000, 0.0, 4.3), "G8IV": (5500, 0.0, 4.3), "K0IV": (5250, 0.0, 4.3), "K7IV": (4000, 0.0, 4.3), "M0IV": (3750, 0.0, 4.3), "M9IV": (3000, 0.0, 4.7), # Bracketing for interpolation "O0III": (55000, 0.0, 3.5), # Bracketing for interpolation "B0III": (29000, 0.0, 3.5), "B5III": (15000, 0.0, 3.5), "G0III": (5750, 0.0, 3.0), "G5III": (5250, 0.0, 2.5), "K0III": (4750, 0.0, 2.0), "K5III": (4000, 0.0, 1.5), "M0III": (3750, 0.0, 1.5), "M6III": (3000, 0.0, 1.0), # Bracketing for interpolation "O0I": (45000, 0.0, 5.0), # Bracketing for interpolation "O6I": (39000, 0.0, 4.5), "O8I": (34000, 0.0, 4.0), "B0I": (26000, 0.0, 3.0), "B5I": (14000, 0.0, 3.0), "A0I": (9750, 0.0, 2.0), "A5I": (8500, 0.0, 2.0), "F0I": (7750, 0.0, 2.0), "F5I": (7000, 0.0, 1.5), "G0I": (5500, 0.0, 1.5), "G5I": (4750, 0.0, 1.0), "K0I": (4500, 0.0, 1.0), "K5I": (3750, 0.0, 0.5), "M0I": (3750, 0.0, 0.0), "M2I": (3500, 0.0, 0.0), "M5I": (3000, 0.0, 0.0), # Bracketing for interpolation } def sort_sptype(typestr): letter = typestr[0] lettervals = {'O': 0, 'B': 1, 'A': 2, 'F': 3, 'G': 4, 'K': 5, 'M': 6} value = lettervals[letter] * 1.0 value += (int(typestr[1]) * 0.1) if "III" in typestr: value += 30 elif "I" in typestr: value += 10 elif "V" in typestr: value += 50 return value # Generate list of spectral types sptype_list = list(lookuptable.keys()) # Test if the user wants a flat spectrum (in photlam) # Check if Teff, metallicity, and log_g are specified if (Teff is not None) and (metallicity is not None) and (log_g is not None): v0, v1, v2 = (Teff, metallicity, log_g) if 'bosz' in catname.lower(): sp = call_bosz(v0,v1,v2,**kwargs) else: if ('ck04models' in catname.lower()) and (v0<3500): _log.warning("ck04 models stop at 3500K. Setting Teff=3500.") v0 = 3500 sp = s_ext.Icat(catname, v0, v1, v2) sp.name = '({:.0f},{:0.1f},{:0.1f})'.format(v0,v1,v2) elif 'flat' in sptype.lower(): # waveset = s_ext.refs._default_waveset # sp = s_ext.ArraySpectrum(waveset, 0*waveset + 10.) sp = s_ext.FlatSpectrum(10, fluxunits='photlam') sp.name = 'Flat spectrum in photlam' elif sptype in sptype_list: v0, v1, v2 = lookuptable[sptype] if 'bosz' in catname.lower(): sp = call_bosz(v0,v1,v2,**kwargs) else: if ('ck04models' in catname.lower()) and (v0<3500): _log.warning("ck04 models start at 3500K. Setting Teff=3500.") v0 = 3500 sp = s_ext.Icat(catname, v0, v1, v2) sp.name = sptype else: # Interpolate values for undefined sptype # Sort the list and return their rank values sptype_list.sort(key=sort_sptype) rank_list = np.array([sort_sptype(st) for st in sptype_list]) # Find the rank of the input spec type rank = sort_sptype(sptype) # Grab values from tuples and interpolate based on rank tup_list0 = np.array([lookuptable[st][0] for st in sptype_list]) tup_list1 = np.array([lookuptable[st][1] for st in sptype_list]) tup_list2 = np.array([lookuptable[st][2] for st in sptype_list]) v0 = np.interp(rank, rank_list, tup_list0) v1 = np.interp(rank, rank_list, tup_list1) v2 = np.interp(rank, rank_list, tup_list2) if 'bosz' in catname.lower(): sp = call_bosz(v0,v1,v2,**kwargs) else: if ('ck04models' in catname.lower()) and (v0<3500): _log.warning("ck04 models stop at 3500K. Setting Teff=3500.") v0 = 3500 sp = s_ext.Icat(catname, v0, v1, v2) sp.name = sptype #print(int(v0),v1,v2) # Renormalize if renorm_args exist if len(renorm_args) > 0: sp_norm = sp.renorm(*renorm_args) sp_norm.name = sp.name sp = sp_norm return sp
[docs] def download_votable(target_name, radius=1, **kwargs): """Download a VOTable from the Vizier archive Reads from Vizier URL: f'https://vizier.cds.unistra.fr/viz-bin/sed?-c={target}&-c.rs={radius}' Parameters ---------- target_name : str Name of target to search for in Vizier. Will replace spaces with '+'. radius : float Search radius in arcseconds. Returns ------- astropy.table.Table """ from astropy.table import Table target = target_name.replace(' ' ,'+') tbl = Table.read(f"https://vizier.cds.unistra.fr/viz-bin/sed?-c={target}&-c.rs={radius}") return tbl
# Class for creating an input source spectrum
[docs] class source_spectrum(object): """Model source spectrum The class ingests spectral information of a given target and generates :class:`s_ext.Spectrum` model fit to the known photometric SED. Two model routines can fit. The first is a very simple scale factor that is applied to the input spectrum, while the second takes the input spectrum and adds an IR excess modeled as a modified blackbody function. Parameters ---------- name : string Source name. sptype : string Assumed stellar spectral type. Not relevant if Teff, metallicity, and log_g are specified. mag_val : float Magnitude of input bandpass for initial scaling of spectrum. bp : :class:`s_ext.Bandpass` Bandpass to apply initial mag_val scaling. votable_input: string VOTable name that holds the source's photometry. The user can find the relevant data at http://vizier.u-strasbg.fr/vizier/sed/ and click download data. Or astropy Table with column 'sed_freq', 'sed_flux', 'sed_eflux'. Keyword Args ------------ Teff : float Effective temperature ranging from 3500K to 30000K. metallicity : float Metallicity [Fe/H] value ranging from -2.5 to 0.5. log_g : float Surface gravity (log g) from 0 to 5. Av : float Add extinction to the stellar spectrum catname : str Catalog name, including 'bosz', 'ck04models', and 'phoenix'. Default is 'bosz', which comes from :func:`BOSZ_spectrum`. res : str Spectral resolution to use (200 or 2000 or 20000). interpolate : bool Interpolate spectrum using a weighted average of grid points surrounding the desired input parameters. Default: True radius : float Search radius in arcseconds for Vizier query. Default: 1 arcsec. Example ------- Generate a source spectrum and fit photometric data >>> import webbpsf_ext >>> from webbpsf_ext.spectra import source_spectrum >>> >>> name = 'HR8799' >>> vot = 'votables/{}.vot'.format(name) >>> bp_k = webbpsf_ext.bp_2mass('k') >>> >>> # Read in stellar spectrum model and normalize to Ks = 5.24 >>> src = source_spectrum(name, 'F0V', 5.24, bp_k, vot, >>> Teff=7430, metallicity=-0.47, log_g=4.35) >>> # Fit model to photometry from 0.1 - 30 microns >>> # Saves synphot spectral object at src.sp_model >>> src.fit_SED(wlim=[0.1,30]) >>> sp_sci = src.sp_model """
[docs] def __init__(self, name, sptype, mag_val, bp, votable_input, Teff=None, metallicity=None, log_g=None, Av=None, **kwargs): self.name = name # Setup initial spectrum kwargs['Teff'] = Teff kwargs['metallicity'] = metallicity kwargs['log_g'] = log_g self.sp0 = stellar_spectrum(sptype, mag_val, 'vegamag', bp, **kwargs) # Read in a low res version for photometry matching kwargs['res'] = 200 if not kwargs.get('use_2024', True) else 500 self.sp_lowres = stellar_spectrum(sptype, mag_val, 'vegamag', bp, **kwargs) if Av is not None: Rv = 4 self.sp0 = self.sp0 * s_ext.Extinction(Av/Rv,name='mwrv4') self.sp_lowres = self.sp_lowres * s_ext.Extinction(Av/Rv,name='mwrv4') self.sp0 = self.sp0.renorm(mag_val, 'vegamag', bp) self.sp_lowres = self.sp_lowres.renorm(mag_val, 'vegamag', bp) self.sp0.name = sptype self.sp_lowres.name = sptype # Init model to None self.sp_model = None # Backwards compatible with votable_file if kwargs.get('votable_file') is not None: self.votable_input = kwargs.get('votable_file') else: self.votable_input = votable_input # Read in photometry self._check_file(**kwargs) self._gen_table() self._combine_fluxes()
def _check_file(self, **kwargs): """Check if VOTable exists. If not, download from Vizier.""" # Check if votable_input is a string is_str = isinstance(self.votable_input, str) # If VOTable does not exist, download from Vizier. # Stores input as astropy table. if is_str and (not os.path.exists(self.votable_input)): _log.warning('VOTable does not exist. Downloading from Vizier...') tbl = download_votable(self.name, **kwargs) # Save VOTable to file save_name = self.votable_input _log.info('Writing VOTable to {}'.format(save_name)) tbl.write(save_name, format='votable') self.votable_input = tbl def _gen_table(self): """Read VOTable and convert to astropy table""" if isinstance(self.votable_input, str): # Import source SED from VOTable from astropy.io.votable import parse_single_table table = parse_single_table(self.votable_input) # Convert to astropy table tbl = table.to_table() else: # Otherwise assume already astropy Table tbl = self.votable_input # Convert from frequency to wavelength freq = tbl['sed_freq'] wave_A = freq.to(u.Angstrom, equivalencies=u.spectral()) # Add wavelength column col = tbl.Column(wave_A, 'sed_wave') tbl.add_column(col) # Sort flux monotomically with wavelength tbl.sort(['sed_wave', 'sed_flux']) self.table = tbl def _combine_fluxes(self): """Average duplicate data points Creates average of duplicate points stored in self.sp_phot. """ table = self.table wave = table['sed_wave'] flux = table["sed_flux"] eflux = table["sed_eflux"] # Average duplicate data points uwave, ucnt = np.unique(wave, return_counts=True) uflux = [] uflux_e = [] for i, w in enumerate(uwave): ind = (wave==w) flx = np.median(flux[ind]) if ucnt[i]>1 else flux[ind][0] uflux.append(flx) eflx = medabsdev(flux[ind]) if ucnt[i]>1 else eflux[ind][0] uflux_e.append(eflx) uflux = np.array(uflux) uflux_e = np.array(uflux_e) # Check if uwave and uflux are astropy units if isinstance(uwave, table.Column): uwave = uwave.value if isinstance(uflux, table.Column): uflux = uflux.value if isinstance(uflux, table.Column): uflux_e = uflux_e.value # Photometric data points sp_phot = s_ext.ArraySpectrum(uwave, uflux, waveunits=wave.unit.name, fluxunits=flux.unit.name) sp_phot.convert(self.sp_lowres.waveunits) sp_phot.convert(self.sp_lowres.fluxunits) sp_phot_e = s_ext.ArraySpectrum(uwave, uflux_e, waveunits=wave.unit.name, fluxunits=eflux.unit.name) sp_phot_e.convert(self.sp_lowres.waveunits) sp_phot_e.convert(self.sp_lowres.fluxunits) self.sp_phot = sp_phot self.sp_phot_e = sp_phot_e
[docs] def bb_jy(self, wave, T): """Blackbody function (Jy) For a given wavelength set (in um) and a Temperature (K), return the blackbody curve in units of Jy. Parameters ---------- wave : array_like Wavelength array in microns T : float Temperature of blackbody (K) """ # Physical Constants #H = 6.62620000E-27 # Planck's constant in cgs units HS = 6.62620000E-34 # Planck's constant in standard units C = 2.99792458E+08 # speed of light in standard units K = 1.38064852E-23 # Boltzmann constant in standard units # Blackbody coefficients (SI units) C1 = 2.0 * HS * C # Power * unit area / steradian C2 = HS * C / K w_m = wave * 1e-6 exponent = C2 / (w_m * T) expfactor = np.exp(exponent) return 1.0E+26 * C1 * (w_m**-3.0) / (expfactor - 1.0)
[docs] def model_scale(self, x, sp=None): """Simple model to scale stellar spectrum""" sp = self.sp_lowres if sp is None else sp return x[0] * sp
[docs] def model_IRexcess(self, x, sp=None): """Model for stellar spectrum with IR excess Model of a stellar spectrum plus IR excess, where the excess is a modified blackbody. The final model follows the form: .. math:: x_0 BB(\lambda, x_1) \lambda^{x_2} """ sp = self.sp_lowres if sp is None else sp wave = sp.wave bb_flux = x[0] * self.bb_jy(wave/1e4, x[1]) * (wave/1e4)**x[2] / 1e17 sp_bb = s_ext.ArraySpectrum(wave, bb_flux, fluxunits='Jy') sp_bb.convert(sp.fluxunits) return sp + sp_bb
[docs] def func_resid(self, x, IR_excess=False, wlim=[0.1, 30], use_err=True): """Calculate model residuals Parameters ---------- x : array_like Model parameters for either `model_scale` or `model_IRexcess`. See these two functions for more details. IR_excess: bool Include IR excess in model fit? This is a simple modified blackbody. wlim : array_like Min and max limits for wavelengths to consider (microns). use_err : bool Should we use the uncertainties in the SED photometry for weighting? """ # Star model and photometric data sp_star = self.sp_lowres sp_phot = self.sp_phot sp_phot_e = self.sp_phot_e # Which model are we using? func_model = self.model_IRexcess if IR_excess else self.model_scale sp_model = func_model(x, sp_star) wvals = sp_phot.wave wmin, wmax = np.array(wlim)*1e4 ind = (wvals >= wmin) & (wvals <= wmax) wvals = wvals[ind] yvals = sp_phot.flux[ind] evals = sp_phot_e.flux[ind] # Instead of interpolating on a high-resolution grid, # we should really rebin onto a more coarse grid. mod_interp = np.interp(wvals, sp_star.wave, sp_model.flux) # Normalize values so the residuals aren't super small/large norm = np.nanmedian(yvals) resid = (mod_interp - yvals) if use_err: resid /= evals # Return non-NaN normalized values return resid[~np.isnan(resid)] / norm
[docs] def fit_SED(self, x0=None, robust=True, use_err=True, IR_excess=False, wlim=[0.3,10], verbose=True): """Fit a model function to photometry Use :func:`scipy.optimize.least_squares` to find the best fit model to the observed photometric data. If no parameters passed, then defaults are set. Keyword Args ------------ x0 : array_like Initial guess of independent variables. robust : bool Perform an outlier-resistant fit. use_err : bool Should we use the uncertainties in the SED photometry for weighting? IR_excess: bool Include IR excess in model fit? This is a simple modified blackbody. wlim : array_like Min and max limits for wavelengths to consider (microns). verbose : bool Print out best-fit model parameters. Defalt is True. """ from scipy.optimize import least_squares # Default initial starting parameters if x0 is None: x0 = [1.0, 2000.0, 0.5] if IR_excess else [1.0] # Robust fit? loss = 'soft_l1' if robust else 'linear' # Perform least-squares fit kwargs={'IR_excess':IR_excess, 'wlim':wlim, 'use_err':use_err} res = least_squares(self.func_resid, x0, bounds=(0,np.inf), loss=loss, kwargs=kwargs) out = res.x if verbose: print('SED best-fit params:', out) # Which model are we using? func_model = self.model_IRexcess if IR_excess else self.model_scale # Create final model spectrum sp_model = func_model(out, self.sp0) sp_model.name = self.name self.sp_model = sp_model
@plt.style.context('webbpsf_ext.wext_style') def plot_SED(self, ax=None, return_figax=False, xr=[0.3,30], yr=None, units='Jy', **kwargs): sp0 = self.sp0 sp_phot = self.sp_phot sp_phot_e = self.sp_phot_e sp_model = self.sp_model # Convert to Jy and save original units sp0_units = sp0.fluxunits.name sp_phot_units = sp_phot.fluxunits.name # nuFnu or lamFlam? if (units=='nufnu') or (units=='lamflam'): units = 'flam' lfl = True else: lfl = False sp0.convert(units) sp_phot.convert(units) if ax is None: fig, ax = plt.subplots(1,1, figsize=(8,5)) w = sp0.wave / 1e4 f = sp0.flux if lfl: f = f * sp0.wave if xr is not None: ind = (w>=xr[0]) & (w<=xr[1]) w, f = (w[ind], f[ind]) ax.loglog(w, f, lw=1, label='Photosphere', **kwargs) w = sp_phot.wave / 1e4 f = sp_phot.flux f_err = sp_phot_e.flux if lfl: f = f * sp_phot.wave f_err = f_err * sp_phot.wave if xr is not None: ind = (w>=xr[0]) & (w<=xr[1]) w, f, f_err = (w[ind], f[ind], f_err[ind]) ax.errorbar(w, f, yerr=f_err, marker='.', ls='none', label='Photometry') if sp_model is not None: sp_model_units = sp_model.fluxunits.name sp_model.convert(units) w = sp_model.wave / 1e4 f = sp_model.flux if lfl: f = f * sp_model.wave if xr is not None: ind = (w>=xr[0]) & (w<=xr[1]) w, f = (w[ind], f[ind]) ax.plot(w, f, lw=1, label='Model Fit') sp_model.convert(sp_model_units) # Labels for various units ulabels = {'photlam': u'photons s$^{-1}$ cm$^{-2}$ A$^{-1}$', 'photnu' : u'photons s$^{-1}$ cm$^{-2}$ Hz$^{-1}$', 'flam' : u'erg s$^{-1}$ cm$^{-2}$ A$^{-1}$', 'fnu' : u'erg s$^{-1}$ cm$^{-2}$ Hz$^{-1}$', 'counts' : u'photons s$^{-1}$', } if lfl: # Special case nuFnu or lamFlam yunits = u'erg s$^{-1}$ cm$^{-2}$' else: yunits = ulabels.get(units, units) ax.set_xlabel('Wavelength (microns)') ax.set_ylabel('Flux ({})'.format(yunits)) ax.set_title(self.name) if xr is not None: ax.set_xlim(xr) if yr is not None: ax.set_ylim(yr) # Better formatting of ticks marks from matplotlib.ticker import LogLocator, AutoLocator, NullLocator from matplotlib.ticker import FuncFormatter, NullFormatter formatter = FuncFormatter(lambda y, _: '{:.16g}'.format(y)) xr = ax.get_xlim() if xr[1] < 10*xr[0]: ax.xaxis.set_major_locator(AutoLocator()) ax.xaxis.set_minor_locator(NullLocator()) else: ax.xaxis.set_major_locator(LogLocator()) ax.xaxis.set_major_formatter(formatter) yr = ax.get_ylim() if yr[1] < 10*yr[0]: ax.yaxis.set_major_locator(AutoLocator()) ax.yaxis.set_minor_formatter(NullFormatter()) ax.yaxis.get_major_locator().set_params(nbins=10, steps=[1,10]) else: ax.yaxis.set_major_locator(LogLocator()) ax.yaxis.set_major_formatter(formatter) ax.legend() # Convert back to original units sp0.convert(sp0_units) sp_phot.convert(sp_phot_units) if ax is None: fig.tight_layout() if return_figax: return (fig,ax)
# Class for reading in planet spectra
[docs] class planets_sb12(object): """Exoplanet spectrum from Spiegel & Burrows (2012) This contains 1680 files, one for each of 4 atmosphere types, each of 15 masses, and each of 28 ages. Wavelength range of 0.8 - 15.0 um at moderate resolution (R ~ 204). The flux in the source files are at 10 pc. If the distance is specified, then the flux will be scaled accordingly. This is also true if the distance is changed by the user. All other properties (atmo, mass, age, entropy) are not adjustable once loaded. Parameters ---------- atmo: str A string consisting of one of four atmosphere types: - 'hy1s' = hybrid clouds, solar abundances - 'hy3s' = hybrid clouds, 3x solar abundances - 'cf1s' = cloud-free, solar abundances - 'cf3s' = cloud-free, 3x solar abundances mass: float A number 1 to 15 Jupiter masses (increments of 1MJup). age: float Age in millions of years (1-1000) entropy: float Initial entropy (8.0-13.0) in increments of 0.25 distance: float Assumed distance in pc (default is 10pc) accr : bool Include accretion (default: False)? mmdot : float From Zhu et al. (2015), the Mjup^2/yr value. If set to None then calculated from age and mass. mdot : float Or use mdot (Mjup/yr) instead of mmdot. accr_rin : float Inner radius of accretion disk (units of RJup; default: 2) truncated: bool Full disk or truncated (ie., MRI; default: False)? base_dir: str, None Location of atmospheric model sub-directories. """ # Define default self.base_dir _base_dir = os.path.join(_spec_dir, 'spiegel/')
[docs] def __init__(self, atmo='hy1s', mass=1, age=100, entropy=10.0, distance=10, accr=False, mmdot=None, mdot=None, accr_rin=2.0, truncated=False, base_dir=None, **kwargs): self._atmo = atmo self._mass = mass self._age = age self._entropy = entropy # Directory of atmospheric files if base_dir is not None: self._base_dir = base_dir # Download and extract tar.gz file if directory does not exist if not os.path.isdir(self.sub_dir): tar_filename = f'SB.{self.atmo}.tar.gz' self._download_tar(extract=True, remove=True, tar_filename=tar_filename) # Find and read appropriate file self._get_file() self._read_file() self.distance = distance self.accr = accr if not accr: self.mmdot = 0 elif mmdot is not None: self.mmdot = mmdot elif mdot is not None: self.mmdot = self.mass * mdot # MJup^2/yr else: mdot = self.mass / (1e6 * self.age) # Assumed MJup/yr self.mmdot = self.mass * mdot # MJup^2/yr self.rin = accr_rin self.truncated = truncated
def _download_tar(self, extract=True, remove=True, tar_filename=None): """Download and extract tar file""" import requests if tar_filename is None: tar_filename = f'SB.{self.atmo}.tar.gz' tar_path = os.path.join(self._base_dir, tar_filename) # check if tar file already exists if not os.path.exists(tar_path): # URL location url_base = 'http://mips.as.arizona.edu/~jleisenring/spiegel/' url_path = os.path.join(url_base, tar_filename) # Make request _log.info(f'Downloading file: {tar_filename}') req = requests.get(url_path, allow_redirects=True) # Raise exception if file not found or other HTTP error if req.status_code != requests.codes.ok: req.raise_for_status() # Save file to directory _log.info(f'Saving file to: {tar_path}') open(tar_path, 'wb').write(req.content) if extract: self._extract_dir(tar_path=tar_path, remove=remove) def _extract_dir(self, tar_path=None, remove=False): """Check if directory exists or is still .tar.gz""" import tarfile if tar_path is None: tar_path = os.path.join(self._base_dir, f'SB.{self.atmo}.tar.gz') if not os.path.isdir(self.sub_dir): tar = tarfile.open(tar_path, "r:gz") tar.extractall(self._base_dir) tar.close() # Remove tar file if remove: try: os.remove(tar_path) except FileNotFoundError: pass def _get_file(self): """Find the file closest to the input parameters""" files = []; masses = []; ages = [] for file in os.listdir(self.sub_dir): files.append(file) fsplit = re.split('[_\.]',file) ind_mass = fsplit.index('mass') + 1 ind_age = fsplit.index('age') + 1 masses.append(int(fsplit[ind_mass])) ages.append(int(fsplit[ind_age])) files = np.array(files) ages = np.array(ages) masses = np.array(masses) # Find those indices closest in mass mdiff = np.abs(masses - self.mass) ind_mass = mdiff == np.min(mdiff) # Of those masses, find the closest age adiff = np.abs(ages - self.age) ind_age = adiff[ind_mass] == np.min(adiff[ind_mass]) # Get the final file name self.file = ((files[ind_mass])[ind_age])[0] # Update attributes self._mass = masses[ind_mass][0] self._age = ages[ind_mass][ind_age][0] def _read_file(self): """Read in the file data""" # Read in the file's content row-by-row (saved as a string) file_path = os.path.join(self.sub_dir, self.file) with open(file_path) as f: content = f.readlines() content = [x.strip('\n') for x in content] # Parse the strings into an array # Row #, Value # 1 col 1: age (Myr); # cols 2-601: wavelength (in microns, in range 0.8-15.0) # 2-end col 1: initial S; # cols 2-601: F_nu (in mJy for a source at 10 pc) ncol = len(content[0].split()) nrow = len(content) arr = np.zeros([nrow,ncol]) for i,row in enumerate(content): arr[i,:] = np.array(row.split(), dtype='float64') # Find the closest entropy and save entropy = arr[1:,0] diff = np.abs(self.entropy - entropy) ind = diff == np.min(diff) # Update entropy attribute self._entropy = entropy[ind][0] # Get Fluxes self._flux = arr[1:,1:][ind,:].flatten() self._fluxunits = 'mJy' # Save the wavelength information self._wave = arr[0,1:] self._waveunits = 'um' # Distance (10 pc) self._distance = 10.0 @property def sub_dir(self): return os.path.join(self._base_dir, f'SB.{self.atmo}') @property def mdot(self): """Accretion rate in MJup/yr""" return self.mmdot / self.mass @property def wave(self): """Wavelength of spectrum""" return self._wave @property def waveunits(self): """Wavelength units""" return self._waveunits @property def flux(self): """Spectral flux""" return self._flux @property def fluxunits(self): """Flux units""" return self._fluxunits @property def distance(self): """Assumed distance to source (pc)""" return self._distance @distance.setter def distance(self, value): self._flux = self._flux * (self._distance/value)**2 self._distance = value @property def atmo(self): """Atmosphere type """ return self._atmo @property def mass(self): """Mass of planet (MJup)""" return self._mass @property def age(self): """Age in millions of years""" return self._age @property def entropy(self): """Initial entropy (8.0-13.0)""" return self._entropy def export_pysynphot(self, **kwargs): _log.warning("export_pysynphot() is deprecated. Use export_synphot() instead.") return self.export_synphot(**kwargs)
[docs] def export_synphot(self, waveout='angstrom', fluxout=None): """Output to synphot spectrum object Parameters ---------- waveout : str Wavelength units for output fluxout : str Flux units for output """ w = self.wave; f = self.flux name = (re.split('[\.]', self.file))[0]#[5:] sp = s_ext.ArraySpectrum(w, f, name=name, waveunits=self.waveunits, fluxunits=self.fluxunits) sp.convert(waveout) if fluxout is not None: sp.convert(fluxout) if self.accr and (self.mmdot>0): sp_mdot = sp_accr(self.mmdot, rin=self.rin, dist=self.distance, truncated=self.truncated, waveout=waveout, fluxout=fluxout) # Interpolate accretion spectrum at each wavelength # and create new composite spectrum fnew = np.interp(sp.wave, sp_mdot.wave, sp_mdot.flux) sp_new = s_ext.ArraySpectrum(sp.wave, sp.flux+fnew, waveunits=waveout, fluxunits=fluxout) return sp_new else: return sp
[docs] def sp_accr(mmdot, rin=2, dist=10, truncated=False, waveout='angstrom', fluxout='photlam', base_dir=None): """Exoplanet accretion flux values (Zhu et al., 2015). Calculated the wavelength-dependent flux of an exoplanet accretion disk/shock from Zhu et al. (2015). Note ---- This function only uses the table of photometric values to calculate photometric brightness from a source, so not very useful for simulating spectral observations. Parameters ---------- mmdot : float Product of the exoplanet mass and mass accretion rate (MJup^2/yr). Values range from 1e-7 to 1e-2. rin : float Inner radius of accretion disk (units of RJup; default: 2). dist : float Distance to object (pc). truncated: bool If True, then the values are for a disk with Rout=50 RJup, otherwise, values were calculated for a full disk (Rout=1000 RJup). Accretion from a "tuncated disk" is due mainly to MRI. Luminosities for full and truncated disks are very similar. waveout : str Wavelength units for output fluxout : str Flux units for output base_dir: str, None Location of accretion model sub-directories. """ base_dir = _spec_dir if base_dir is None else base_dir fname = os.path.join(base_dir, 'zhu15_accr.txt') names = ('MMdot', 'Rin', 'Tmax', 'J', 'H', 'K', 'L', 'M', 'N', 'J2', 'H2', 'K2', 'L2', 'M2', 'N2') tbl = ascii.read(fname, guess=True, names=names) # Inner radius values and Mdot values rin_vals = np.unique(tbl['Rin']) mdot_vals = np.unique(tbl['MMdot']) nmdot = len(mdot_vals) assert (rin >=rin_vals.min()) & (rin <=rin_vals.max()), "rin is out of range" assert (mmdot>=mdot_vals.min()) & (mmdot<=mdot_vals.max()), "mmdot is out of range" if truncated: mag_names = ('J2', 'H2', 'K2', 'L2', 'M2', 'N2') else: mag_names = ('J', 'H', 'K', 'L', 'M', 'N') wcen = np.array([ 1.2, 1.6, 2.2, 3.8, 4.8, 10.0]) zpt = np.array([1600, 1020, 657, 252, 163, 39.8]) mag_arr = np.zeros([6,nmdot]) for i, mv in enumerate(mdot_vals): for j, mag in enumerate(mag_names): tbl_sub = tbl[tbl['MMdot']==mv] rinvals = tbl_sub['Rin'] magvals = tbl_sub[mag] mag_arr[j,i] = np.interp(rin, rinvals, magvals) mag_vals = np.zeros(6) for j in range(6): xi = 10**(mmdot) xp = 10**(mdot_vals) yp = 10**(mag_arr[j]) mag_vals[j] = np.log10(np.interp(xi, xp, yp)) mag_vals += 5*np.log10(dist/10) flux_Jy = 10**(-mag_vals/2.5) * zpt sp = s_ext.ArraySpectrum(wcen*1e4, flux_Jy, fluxunits='Jy') sp.convert(waveout) sp.convert(fluxout) return sp
[docs] def jupiter_spec(dist=10, waveout='angstrom', fluxout='photlam', base_dir=None): """Jupiter as an Exoplanet Read in theoretical Jupiter spectrum from Irwin et al. 2014 and output as a synphot spectrum. Parameters =========== dist : float Distance to Jupiter (pc). waveout : str Wavelength units for output. fluxout : str Flux units for output. base_dir: str, None Location of tabulated file irwin_2014_ref_spectra.txt. """ base_dir = os.path.join(_spec_dir, 'solar_system/') if base_dir is None else base_dir fname = os.path.join(base_dir, 'irwin_2014_ref_spectra.txt') # Column 1: Wavelength (in microns) # Column 2: 100*Ap/Astar (Earth-Sun Primary Transit) # Column 3: 100*Ap/Astar (Earth-Mdwarf Primary Transit) # Column 4: 100*Ap/Astar (Jupiter-Sun Primary Transit) # Column 5: Fp/Astar (Earth-Sun Secondary Eclipse) # Column 6: Disc-averaged radiance of Earth (W cm-2 sr-1 micron-1) # Column 7: Fp/Astar (Jupiter-Sun Secondary Eclipse) # Column 8: Disc-averaged radiance of Jupiter (W cm-2 sr-1 micron-1) # Column 9: Solar spectral irradiance spectrum (W micron-1) # (Solar Radius = 695500.0 km) # Column 10: Mdwarf spectral irradiance spectrum (W micron-1) # (Mdwarf Radius = 97995.0 km) data = ascii.read(fname, data_start=14) wspec = data['col1'] * 1e4 # Angstrom fspec = data['col8'] * 1e3 # erg s-1 cm^-2 A^-1 sr^-1 # Steradians to square arcsec sr_to_asec2 = (3600*180/np.pi)**2 fspec /= sr_to_asec2 # *** / arcsec^2 # Angular size of Jupiter at some distance RJup_km = 71492.0 au_to_km = 149597870.7 # Angular size (arcsec) of Jupiter radius RJup_asec = RJup_km / au_to_km / dist area = np.pi * RJup_asec**2 # flux in f_lambda fspec *= area # erg s-1 cm^-2 A^-1 sp = s_ext.ArraySpectrum(wspec, fspec, fluxunits='flam') sp.convert(waveout) sp.convert(fluxout) return sp
[docs] def linder_table(file=None, **kwargs): """Load Linder Model Table Function to read in isochrone models from Linder et al. 2019. Returns an astropy Table. Parameters ---------- file : string Location and name of Linder et al file. Default is ``BEX_evol_mags_-3_MH_0.00.dat``. """ # Default input directory indir = os.path.join(_spec_dir, 'linder', 'isochrones/') # Default file if file is None: file = 'BEX_evol_mags_-3_MH_0.00.dat' # First check if file is in indir file_path = os.path.join(indir, file) if not os.path.exists(file_path): # Check if file is in current directory file_path = file if not os.path.exists(file): raise ValueError(f"File {file_path} not found.") with open(file_path) as f: content = f.readlines() content = [x.strip('\n') for x in content] cnames = content[2].split(',') cnames = [name.split(':')[1] for name in cnames] ncol = len(cnames) content_arr = [] for line in content[4:]: arr = np.array(line.split()).astype(float) if len(arr)>0: content_arr.append(arr) content_arr = np.array(content_arr) # Convert to Astropy Table tbl = Table(rows=content_arr, names=cnames) return tbl
[docs] def linder_filter(table, filt, age, dist=10, cond_file=None, extrapolate=True, **kwargs): """Linder Mags vs Mass Arrays Given a Linder table, filter name, and age (Myr), return arrays of MJup and Vega mags. If distance (pc) is provided, then return the apparent magnitude, otherwise absolute magnitude at 10pc. This function takes the isochrones tables from Linder et al 2019 and creates a irregular contour grid of filter magnitude and log(age) where the z-axis is log(mass). This is mapped onto a regular grid that is interpolated within the data boundaries and linearly extrapolated outside of the region of available data. Parameters ========== table : astropy table Astropy table output from `linder_table`. filt : string Name of NIRCam filter. age : float Age in Myr of planet. Keyword Args ============= dist : float Distance in pc. Default is 10pc (abs mag). cond_file : string COND file to use for extrapolating to higher masses. extrapolate : bool If True, extrapolate to higher masses using COND models as well as lower masses using a lower order polynomial fit. """ from .maths import jl_poly, jl_poly_fit # In the event of underscores within name filt = filt.split('_')[0] try: x = table[filt] except KeyError: # In case specific filter doesn't exist, interpolate x = [] cnames = [ 'SPHEREY', 'NACOJ', 'NACOH', 'NACOKs', 'NACOLp', 'NACOMp', 'F115W', 'F150W', 'F200W', 'F277W', 'F356W', 'F444W', 'F560W', 'F770W', 'F1000W', 'F1280W', 'F1500W', 'F1800W', 'F2100W', 'F2550W', ] wvals = np.array([ 1.04, 1.27, 1.66, 2.20, 3.80, 4.80, 1.15, 1.50, 2.00, 2.76, 3.57, 4.41, 5.60, 7.67, 9.97, 12.84, 15.08, 17.99, 20.85, 25.27, ]) # Sort by wavelength isort = np.argsort(wvals) cnames = list(np.array(cnames)[isort]) wvals = wvals[isort] # Turn table data into array and interpolate at filter wavelength tbl_arr = np.array([table[cn].data for cn in cnames]).transpose() try: bp = nircam_filter(filt) except: bp = miri_filter(filt) wint = bp.avgwave().to_value('um') x = np.array([np.interp(wint, wvals, row) for row in tbl_arr]) y = table['log(Age/yr)'].data z = table['Mass/Mearth'].data zlog = np.log10(z) ####################################################### # Grab COND model data to fill in higher masses if extrapolate: base_dir = os.path.join(_spec_dir, 'cond_models/') if cond_file is None: cond_file = os.path.join(base_dir, 'model.AMES-Cond-2000.M-0.0.JWST.Vega') npsave_file = os.path.join(cond_file + f'.{filt}.npy') if os.path.exists(npsave_file): mag2, age2, mass2_mjup = np.load(npsave_file) else: d_tbl2 = cond_table(file=cond_file) # Dictionary of ages mass2_mjup = [] mag2 = [] age2 = [] for k in d_tbl2.keys(): tbl2 = d_tbl2[k] mass2_mjup = mass2_mjup + list(tbl2['MJup'].data) try: mag2 = mag2 + list(tbl2[filt+'a'].data) # NIRCam except KeyError: filt_alt = {'F1065C':'F1000W', 'F1140C':'F1130W', 'F1550C':'F1500W', 'F2300C':'F2100W'} fcol = filt_alt.get(filt, filt) mag2 = mag2 + list(tbl2[fcol].data) # MIRI age2 = age2 + list(np.ones(len(tbl2))*k) mass2_mjup = np.array(mass2_mjup) mag2 = np.array(mag2) age2 = np.array(age2) mag_age_mass = np.array([mag2,age2,mass2_mjup]) np.save(npsave_file, mag_age_mass) # Irregular grid x2 = mag2 age2 = age2 * 1e6 # Convert from Myr to years y2 = np.log10(age2) z2 = mass2_mjup * 318 # Convert to Earth masses zlog2 = np.log10(z2) ####################################################### if extrapolate: xlim = np.array([x2.min(),x.max()+5]) # magnitude limits ylim = np.array([6,10]) # 10^6 to 10^10 yrs else: xlim = np.array([x.min(),x.max()]) # magnitude limits ylim = np.array([y.min(),y.max()]) # age limits dx = (xlim[1] - xlim[0]) / 200 dy = (ylim[1] - ylim[0]) / 200 xgrid = np.arange(xlim[0], xlim[1]+dx, dx) ygrid = np.arange(ylim[0], ylim[1]+dy, dy) X, Y = np.meshgrid(xgrid, ygrid) zgrid = griddata((x,y), zlog, (X, Y), method='cubic') # There will be NaN's along the border that need to be replaced ind_nan = np.isnan(zgrid) if extrapolate: # Replace some NaNs with COND grid zgrid_cond = griddata((x2,y2), zlog2, (X, Y), method='cubic') zgrid[ind_nan] = zgrid_cond[ind_nan] ind_nan = np.isnan(zgrid) # Remove rows/cols with NaN's # x is mag, y is log(age), z is log(mass) # xgrid2, ygrid2, zgrid2 = _trim_nan_array(xgrid, ygrid, zgrid) xgrid2, ygrid2, zgrid2 = xgrid, ygrid, zgrid # Create regular grid interpolator function for extrapolation at NaN's # Need to us linear method over cubic if not trimming NaN's fill_value = None if extrapolate else np.nan func = RegularGridInterpolator((ygrid2,xgrid2), zgrid2, method='linear', bounds_error=False, fill_value=fill_value) if extrapolate: # Fix NaN's in zgrid and rebuild func pts = np.array([Y[ind_nan], X[ind_nan]]).transpose() zgrid[ind_nan] = func(pts) func = RegularGridInterpolator((ygrid,xgrid), zgrid, method='linear', bounds_error=False, fill_value=None) # Get mass limits for series of magnitudes at a given age age_log = np.log10(age*1e6) mag_abs_arr = xgrid pts = np.array([(age_log,xval) for xval in mag_abs_arr]) mass_arr = 10**func(pts) / 318.0 # Convert to MJup # Get rid of any NaN's ind_use = ~np.isnan(mass_arr) mag_abs_arr = mag_abs_arr[ind_use] mass_arr = mass_arr[ind_use] # Sort by magnitude isort = np.argsort(mag_abs_arr) mag_abs_arr = mag_abs_arr[isort] mass_arr = mass_arr[isort] # At a given magnitude, if there is a lower mass at a brighter magnitude, # replace mass value # for i, mag in enumerate(mag_abs_arr): # ind = mag_abs_arr < mag # if ind.sum() > 0: # mass_arr[i] = np.min([mass_arr[i], mass_arr[ind].min()]) # Fit low order polynomial ifit = mag_abs_arr<x.max() xfit = np.append(mag_abs_arr[ifit], mag_abs_arr[-1]) yfit = np.log10(np.append(mass_arr[ifit], mass_arr[-1])) # Perform a bunch of polynomial fits and find chi^2 to choose optimal degree use_lg = False lxmap = None # np.array([mag_abs_arr.min(), mag_abs_arr.max()]) chi2_arr = [] deg_arr = np.arange(1,8) for deg in deg_arr: cf = jl_poly_fit(xfit, yfit, deg=deg, use_legendre=use_lg, lxmap=lxmap) vals_fit = 10**jl_poly(mag_abs_arr,cf, use_legendre=use_lg, lxmap=lxmap) chi2 = np.sum((mass_arr - vals_fit)**2) chi2_arr.append(chi2) ind_deg = np.argmin(chi2_arr) deg = deg_arr[ind_deg] cf = jl_poly_fit(xfit, yfit, deg=deg, use_legendre=use_lg, lxmap=lxmap) mass_arr_fit = 10**jl_poly(mag_abs_arr,cf, use_legendre=use_lg, lxmap=lxmap) # Convert to apparent magnitude mag_app_arr = mag_abs_arr + 5*np.log10(dist/10.0) # Sort by mass mass_arr_fin = mass_arr_fit isort = np.argsort(mass_arr_fin) mass_arr_fin = mass_arr_fin[isort] mag_app_arr = mag_app_arr[isort] return mass_arr_fin, mag_app_arr
[docs] def cond_table(age=None, file=None, **kwargs): """Load COND Model Table Function to read in the COND model tables, which have been formatted in a very specific way. Has the option to return a dictionary of astropy Tables, where each dictionary element corresponds to the specific ages within the COND table. Or, if the age keyword is specified, then this function only returns a single astropy table. Parameters ---------- age : float Age in Myr. If set to None, then an array of ages from the file is used to generate dictionary. If set, chooses the closest age supplied in table. file : string Location and name of COND file. See isochrones stored at https://phoenix.ens-lyon.fr/Grids/. Default is model.AMES-Cond-2000.M-0.0.JWST.Vega """ def make_table(*args): i1, i2 = (ind1[i]+4, ind2[i]) rows = [] for line in content[i1:i2]: if (line=='') or ('---' in line): continue else: vals = np.array(line.split(), dtype='float64') rows.append(tuple(vals)) tbl = Table(rows=rows, names=cnames) # Convert to Jupiter masses newcol = tbl['M/Ms'] * 1047.348644 newcol.name = 'MJup' tbl.add_column(newcol, index=1) tbl['MJup'].format = '.2f' return tbl # Default input directory base_dir = os.path.join(_spec_dir, 'cond_models/') # Default file if file is None: file = 'model.AMES-Cond-2000.M-0.0.JWST.Vega' # First check if file is in indir file_path = os.path.join(base_dir, file) if not os.path.exists(file_path): # Check if file is in current directory file_path = file if not os.path.exists(file): raise ValueError(f"File {file_path} not found.") with open(file_path) as f: content = f.readlines() content = [x.strip('\n') for x in content] # Column names cnames = content[5].split() cnames = ['M/Ms', 'Teff'] + cnames[1:] ncol = len(cnames) # Create a series of tables for each time times_gyr = [] ind1 = [] for i, line in enumerate(content): if 't (Gyr)' in line: times_gyr.append(line.split()[-1]) ind1.append(i) ntimes = len(times_gyr) # Create start and stop indices for each age value ind2 = ind1[1:] + [len(content)] ind1 = np.array(ind1) ind2 = np.array(ind2)-1 # Everything is Gyr, but prefer Myr ages_gyr = np.array(times_gyr, dtype='float64') ages_myr = np.array(ages_gyr * 1000, dtype='int') #times = ['{:.0f}'.format(a) for a in ages_myr] # Return all tables if no age specified if age is None: tables = {} for i in range(ntimes): tbl = make_table(i, ind1, ind2, content) tables[ages_myr[i]] = tbl return tables else: # This is faster if we only want one table ages_diff = np.abs(ages_myr - age) i = np.where(ages_diff==ages_diff.min())[0][0] tbl = make_table(i, ind1, ind2, content) return tbl
[docs] def cond_filter(table, filt, module='A', dist=None, **kwargs): """ Given a COND table and NIRCam filter, return arrays of MJup and Vega mags. If distance (pc) is provided, then return the apparent magnitude, otherwise absolute magnitude at 10pc. Input table has already been filtered by age. """ from .maths import jl_poly, jl_poly_fit # Table Data try: fcol = filt + module.lower() mag_data = table[fcol].data except KeyError: # MIRI coronagraphic filters are incorrect in the AMES-COND files. # It assumes extra attenuation from the central mask, which gives # the incorrect flux for off-axis points sources. Instead, use some # alternate bandpasses filt_alt = {'F1065C':'F1000W', 'F1140C':'F1130W', 'F1550C':'F1500W', 'F2300C':'F2100W'} fcol = filt_alt.get(filt, filt) mag_data = table[fcol].data mcol = 'MJup' mass_data = table[mcol].data # Data to interpolate onto mass_arr = list(np.arange(0.1,1,0.1)) + list(np.arange(1,10)) \ + list(np.arange(10,200,10)) + list(np.arange(200,1400,100)) mass_arr = np.array(mass_arr) # Interpolate mag_arr = np.interp(mass_arr, mass_data, mag_data) # Extrapolate cf = jl_poly_fit(np.log(mass_data), mag_data) ind_out = (mass_arr < mass_data.min()) | (mass_arr > mass_data.max()) mag_arr[ind_out] = jl_poly(np.log(mass_arr), cf)[ind_out] # Distance modulus for apparent magnitude if dist is not None: mag_arr = mag_arr + 5*np.log10(dist/10) return mass_arr, mag_arr
[docs] def mass_sensitivity_table(filt, dist=10, extrapolate=True, lfile=None, age_arr=None, save=True, save_dir=None, return_tbl=False, **kwargs): """ Generate a table of mass sensitivities for a given filter Interpolate mass and brightnesses of BEX evolutionary tracks. Creates a table with first column of magnitudes, second column is log10(Jy), and subsequent columns are log10(Mass) for each age in Myr. Parameters ---------- filt : str Name of filter dist : float Distance in parsecs extrapolate : bool If True, extrapolate to higher masses using COND models as well as lower masses using a lower order polynomial fit. lfile : str Filename of Linder evolutionary tracks. If None, then use BEX_evol_mags_-3_MH_0.00.dat (warm start HELIOS grid). age_arr : array_like Array of ages in Myr. If None, then defualts to [1,2,...,20] Myr. save : bool Save table to file. save_dir : str Directory to save table. If None, then save to current directory. return_tbl : bool Return astropy table object """ from .bandpasses import nircam_filter if lfile is None: lfile = 'BEX_evol_mags_-3_MH_0.00.dat' tbl = linder_table(file=lfile) if age_arr is None: age_arr = np.arange(1, 21, 1) mass_all = [] mag_all = [] for age in age_arr: mass_data, mag_data = linder_filter(tbl, filt, age, dist, extrapolate=extrapolate, **kwargs) mass_all.append(mass_data) mag_all.append(mag_data) # Interpolate masses onto a common magnitude array dm = 0.1 if extrapolate: mag_arr_min = np.round(np.max([np.min(m) for m in mag_all])+dm, 1) mag_arr_max = np.round(np.max([np.max(m) for m in mag_all])-dm, 1) else: mag_arr_min = np.round(np.min([np.min(m) for m in mag_all])+dm, 1) mag_arr_max = np.round(np.max([np.max(m) for m in mag_all])-dm, 1) mag_arr = np.arange(mag_arr_min, mag_arr_max, 0.1) # Convert magnitudes to Jy # Get 0th magnitude flux density bp = nircam_filter(filt) sp = stellar_spectrum('G2V', 0, 'vegamag', bp) obs = s_ext.Observation(sp, bp, binset=bp.wave) flux0 = obs.effstim('Jy') fluxJy_arr = 10**(-0.4*mag_arr)*flux0 mass_arr = [] for i, mass_vals in enumerate(mass_all): # fn = interp1d(mag_all[i], mass_vals, kind='cubic', bounds_error=False, fill_value=np.nan) mass_arr.append(fn(mag_arr)) # isort = np.argsort(mag_all[i]) # xvals = mag_all[i][isort] # yvals = mass_vals[isort] # mass_arr.append(np.interp(mag_arr, xvals, yvals)) mass_arr = np.array(mass_arr) log_jy = np.log10(fluxJy_arr) # Create astropy table data = np.concatenate([[mag_arr], [log_jy], np.log10(mass_arr)]) names = ['mag', 'log_Jy'] + [f'{age}Myr' for age in age_arr] tbl = Table(data=data.T, names=names) tbl['mag'].info.format = '.1f' tbl['log_Jy'].info.format = '.3f' for k in tbl.colnames[2:]: tbl[k].info.format = '.4f' if save: ext_str = '_extrapolated' if extrapolate else '' save_name = f'{filt}__{lfile[:-4]}__d{dist}pc{ext_str}.txt' if save_dir is not None: save_path = os.path.join(save_dir, save_name) else: save_path = save_name tbl.write(save_path, overwrite=True, format='ascii.fixed_width', bookend=False, delimiter=None, delimiter_pad=' ') if return_tbl: return tbl
def _trim_nan_array(xgrid, ygrid, zgrid): """NaN Trimming of Array Image For an image with a rotated border of NaN's, remove rows/cols with NaN's while trying to preserve the maximum footprint of real data. """ xgrid2, ygrid2, zgrid2 = xgrid, ygrid, zgrid # Create a mask of NaN'ed values nan_mask = np.isnan(zgrid2) nrows, ncols = nan_mask.shape # Determine number of NaN's along each row and col num_nans_cols = nan_mask.sum(axis=0) num_nans_rows = nan_mask.sum(axis=1) # First, crop all rows/cols that are only NaN's xind_good = np.where(num_nans_cols < nrows)[0] yind_good = np.where(num_nans_rows < ncols)[0] # get border limits x1, x2 = (xind_good.min(), xind_good.max()+1) y1, y2 = (yind_good.min(), yind_good.max()+1) # Trim of NaN borders xgrid2 = xgrid2[x1:x2] ygrid2 = ygrid2[y1:y2] zgrid2 = zgrid2[y1:y2,x1:x2] # Find a optimal rectangule subsection free of NaN's # Iterative cropping ndiff = 5 while np.isnan(zgrid2.sum()): # Make sure ndiff is not negative if ndiff<0: break npix = zgrid2.size # Create a mask of NaN'ed values nan_mask = np.isnan(zgrid2) nrows, ncols = nan_mask.shape # Determine number of NaN's along each row and col num_nans_cols = nan_mask.sum(axis=0) num_nans_rows = nan_mask.sum(axis=1) # Look for any appreciable diff row-to-row/col-to-col col_diff = num_nans_cols - np.roll(num_nans_cols,-1) row_diff = num_nans_rows - np.roll(num_nans_rows,-1) # For edge wrapping, just use last minus previous col_diff[-1] = col_diff[-2] row_diff[-1] = row_diff[-2] # Keep rows/cols composed mostly of real data # and where number of NaN's don't change dramatically xind_good = np.where( ( np.abs(col_diff) <= ndiff ) & ( num_nans_cols < 0.5*nrows ) )[0] yind_good = np.where( ( np.abs(row_diff) <= ndiff ) & ( num_nans_rows < 0.5*ncols ) )[0] # get border limits x1, x2 = (xind_good.min(), xind_good.max()+1) y1, y2 = (yind_good.min(), yind_good.max()+1) # Trim of NaN borders xgrid2 = xgrid2[x1:x2] ygrid2 = ygrid2[y1:y2] zgrid2 = zgrid2[y1:y2,x1:x2] # Check for convergence # If we've converged, reduce if npix==zgrid2.size: ndiff -= 1 # Last ditch effort in case there are still NaNs # If so, remove rows/cols 1 by 1 until no NaNs while np.isnan(zgrid2.sum()): xgrid2 = xgrid2[1:-1] ygrid2 = ygrid2[1:-1] zgrid2 = zgrid2[1:-1,1:-1] return xgrid2, ygrid2, zgrid2
[docs] def companion_spec(bandpass, model='SB12', atmo='hy3s', mass=10, age=100, entropy=10, dist=10, accr=False, mmdot=None, mdot=None, accr_rin=2, truncated=False, sptype=None, renorm_args=None, Av=0, **kwargs): """ Create a spectrum of a companion Parameters ---------- bandpass : :class:`s_ext.Bandpass` A synphot bandpass object. model : str Exoplanet model to use ('sb12', 'bex', 'cond') or stellar spectrum model ('bosz', 'ck04models', 'phoenix'). atmo : str A string consisting of one of four atmosphere types: ['hy1s', 'hy3s', 'cf1s', 'cf3s']. mass: int Number 1 to 15 Jupiter masses. age: float Age in millions of years (1-1000). entropy: float Initial entropy (8.0-13.0) in increments of 0.25 sptype : str Instead of using a exoplanet spectrum, specify a stellar type. renorm_args : tuple Renormalization arguments in case you want very specific luminosity in some bandpass. Includes (value, units, bandpass). dist : float Distance in pc. Av : float Extinction magnitude (assumes Rv=4.0) of the companion (e.g., due to being embedded in a disk). accr : bool Include accretion? default: False mmdot : float From Zhu et al. (2015), the Mjup^2/yr value. If set to None then calculated from age and mass. mdot : float Or use mdot (Mjup/yr) instead of mmdot. accr_rin : float Inner radius of accretion disk (units of RJup; default: 2) truncated: bool Full disk or truncated (ie., MRI; default: False). """ # Spiegel & Burrows model already have a class # For BEX and COND models, make if model.lower() in ['sb12', 'bex', 'cond']: calc_accr = False if model.lower() in ['bex', 'cond'] else accr pl = { 'atmo': atmo, 'mass': mass, 'age': age, 'entropy': entropy, 'distance': dist, 'accr': calc_accr, 'mmdot': mmdot, 'mdot': mdot, 'accr_rin': accr_rin, 'truncated': truncated } planet = planets_sb12(**pl) sp = planet.export_synphot() # Check spectral overlap sp_overlap = bandpass.check_overlap(sp) # Ensure there is a data point at the edge of the input bandpass if sp_overlap != 'full': w_end = np.max(bandpass.wave) f_end = sp(w_end) w_new = np.append(sp.wave, w_end) f_new = np.append(sp.flux, f_end) sp = s_ext.ArraySpectrum(w_new, f_new, waveunits=sp.waveunits, fluxunits=sp.fluxunits) del_mag = 0 # Add accretion mag offsets for BEX and COND models if (model.lower() in ['bex', 'cond']) and (accr == True): if sp_overlap != 'full': _log.warning(f"Overlap between spectrum and bandpass: {sp_overlap}.") _log.warning("Accretion calculation may be unreliable.") pl = { 'atmo': atmo, 'mass': mass, 'age': age, 'entropy': entropy, 'distance': dist, 'accr': True, 'mmdot': mmdot, 'mdot': mdot, 'accr_rin': accr_rin, 'truncated': truncated } planet = planets_sb12(**pl) # Get spectrum from accretion component sp_mdot = sp_accr(planet.mmdot, rin=planet.rin, dist=planet.distance, truncated=planet.truncated, waveout=sp.waveunits, fluxout=sp.fluxunits) # Interpolate accretion spectrum at each wavelength fnew = np.interp(sp.wave, sp_mdot.wave, sp_mdot.flux) sp_new = s_ext.ArraySpectrum(sp.wave, fnew, waveunits=sp.waveunits, fluxunits=sp.fluxunits) obs_accr = s_ext.Observation(sp_new, bandpass, binset=bandpass.wave) del_mag -= obs_accr.effstim('vegamag') # Make new spectrum sp = planet.export_synphot() # Add extinction from the disk if Av>0: Rv = 4.0 sp_ext = sp * s_ext.Extinction(Av/Rv, name='mwrv4') if model.lower() in ['bex', 'cond']: if sp_overlap != 'full': _log.warning(f"Overlap between spectrum and bandpass: {sp_overlap}.") _log.warning("Extinction calculation may be unreliable.") obs = s_ext.Observation(sp, bandpass, binset=bandpass.wave) obs_ext = s_ext.Observation(sp_ext, bandpass, binset=bandpass.wave) del_mag += obs_ext.effstim('vegamag') - obs.effstim('vegamag') sp = sp_ext # For BEX and COND models, set up renorm_args # unless renorm_args is already set filt = bandpass.name.split('_')[0] if (renorm_args is not None) and (len(renorm_args) > 0): pass elif model.lower()=='bex': table = linder_table() mass_arr, mag_arr = linder_filter(table, filt, age, dist=dist) mag = np.interp(mass, mass_arr, mag_arr) mag += del_mag # Apply extinction and/or accretion offsets renorm_args = (mag, 'vegamag', bandpass) elif model.lower()=='cond': table = cond_table(age) mass_arr, mag_arr = cond_filter(table, filt, dist=dist) mag = np.interp(mass, mass_arr, mag_arr) mag += del_mag # Apply extinction and/or accretion offsets renorm_args = (mag, 'vegamag', bandpass) # Renormalize to some specified flux in a given bandpass if (renorm_args is not None) and (len(renorm_args) > 0): sp_norm = sp.renorm(*renorm_args, force=True) sp = sp_norm elif sp_overlap != 'full': _log.warning(f"Overlap between spectrum and bandpass: {sp_overlap}.") _log.warning("Recommend supplying renorm_args input.") elif model.lower() in ['bosz', 'ck04models', 'phoenix']: if sptype is None: _log.warning('sptype not specified. Using "flat".') sptype = 'flat' pl = {'sptype': sptype, 'Av': Av, 'renorm_args': renorm_args} sp = stellar_spectrum(sptype) if Av>0: Rv = 4.0 sp *= s_ext.Extinction(Av/Rv, name='mwrv4') if (renorm_args is not None) and (len(renorm_args) > 0): sp_norm = sp.renorm(*renorm_args, force=True) sp = sp_norm name = kwargs.get('name') if name is not None: sp.name = name return sp
[docs] def bin_spectrum(sp, wave, waveunits='um'): """Rebin spectrum Rebin a synphot spectrum to a different wavelength grid. This function first converts the input spectrum to units of counts then combines the photon flux onto the specified wavelength grid. Output spectrum units are the same as the input spectrum. Parameters ----------- sp : :class:`s_ext.Spectrum` Spectrum to rebin. wave : array_like Wavelength grid to rebin onto. waveunits : str Units of wave input. Must be recognizeable by synphot. Returns ------- :class:`s_ext.Spectrum` Rebinned spectrum in same units as input spectrum. """ from .maths import binned_statistic from synphot.binning import calculate_bin_edges waveunits0 = sp.waveunits fluxunits0 = sp.fluxunits # Convert wavelength of input spectrum to desired output units sp.convert(waveunits) # We also want input to be in terms of counts to conserve flux sp.convert('photlam') # Calculate bin edges edges = calculate_bin_edges(wave * sp.waveunits) ind = (sp.waveset >= edges[0]) & (sp.waveset <= edges[-1]) binflux = binned_statistic(sp.wave[ind], sp.flux[ind], np.mean, bins=edges) # Interpolate over NaNs ind_nan = np.isnan(binflux) finterp = interp1d(wave[~ind_nan], binflux[~ind_nan], kind='cubic') binflux[ind_nan] = finterp(wave[ind_nan]) sp2 = s_ext.ArraySpectrum(wave, binflux, waveunits=waveunits, fluxunits='photlam') sp2.convert(waveunits0) sp2.convert(fluxunits0) # Put back units of original input spectrum sp.convert(waveunits0) sp.convert(fluxunits0) return sp2
[docs] def mag_to_counts(src_mag, bandpass, sp_type='G0V', mag_units='vegamag', **kwargs): """ Convert stellar magnitudes in some bandpass to corresponding flux values (e-/sec) """ # Get flux of a 0 magnitude star (zero-point flux) sp = stellar_spectrum(sp_type, 0, mag_units, bandpass) obs = s_ext.Observation(sp, bandpass, binset=bandpass.wave) zp_counts = obs.effstim('counts') # Counts of a 0 mag star # Flux of each star e-/sec src_flux = np.array(zp_counts * 10**(-src_mag / 2.5)) return src_flux