# Import libraries
import numpy as np
from numpy.polynomial import legendre
from scipy.special import eval_legendre
from .coords import dist_image
from .image_manip import frebin
###########################################################################
# Polynomial fitting
###########################################################################
[docs]def jl_poly(xvals, coeff, dim_reorder=False, use_legendre=False, lxmap=None, **kwargs):
"""Evaluate polynomial
Replacement for `np.polynomial.polynomial.polyval(wgood, coeff)`
to evaluate y-values given a set of xvals and coefficients.
Uses matrix multiplication, which is much faster. Beware, the
default output array shapes organization may differ from the
polyval routine for 2D and 3D results.
Parameters
----------
xvals : ndarray
1D array (time, for instance)
coeff : ndarray
1D, 2D, or 3D array of coefficients from a polynomial fit.
The first dimension should have a number of elements equal
to the polynomial degree + 1. Order such that lower degrees
are first, and higher degrees are last.
Keyword Args
------------
dim_reorder : bool
If true, then result to be ordered (nx,ny,nz), otherwise we
use the Python preferred ordering (nz,ny,nx)
use_legendre : bool
Fit with Legendre polynomial, an orthonormal basis set.
lxmap : ndarray or None
Legendre polynomials are normaly mapped to xvals of [-1,+1].
`lxmap` gives the option to supply the values for xval that
should get mapped to [-1,+1]. If set to None, then assumes
[xvals.min(),xvals.max()].
Returns
-------
float array
An array of values where each xval has been evaluated at each
set of supplied coefficients. The output shape has the first
dimension equal to the number of xvals, and the final dimensions
correspond to coeff's latter dimensions. The result is flattened
if there is either only one xval or one set of coeff (or both).
"""
# How many xvals?
n = np.size(xvals)
try:
xdim = len(xvals.shape)
except AttributeError:
# Handle list
xvals = np.array(xvals)
xdim = len(xvals.shape)
# Handle single value
if xdim == 0:
xvals = np.array([xvals])
xdim = len(xvals.shape)
if xdim>1:
raise ValueError('xvals can only have 1 dimension. Found {} dimensions.'.format(xdim))
# Check number of dimensions in coefficients
dim = coeff.shape
ndim = len(dim)
if ndim>3:
raise ValueError('coefficient can only have 1, 2, or 3 dimensions. \
Found {} dimensions.'.format(ndim))
if use_legendre:
# Values to map to [-1,+1]
if lxmap is None:
lxmap = [np.min(xvals), np.max(xvals)]
# Remap xvals -> lxvals
dx = lxmap[1] - lxmap[0]
lxvals = 2 * (xvals - (lxmap[0] + dx/2)) / dx
# Use Identity matrix to evaluate each polynomial component
# xfan = legendre.legval(lxvals, np.identity(dim[0]))
# Below method is faster for large lxvals
xfan = np.asarray([eval_legendre(n, lxvals) for n in range(dim[0])])
else:
# Create an array of exponent values
parr = np.arange(dim[0], dtype='float')
# If 3D, this reshapes xfan to 2D
xfan = xvals**parr.reshape((-1,1)) # Array broadcasting
# Reshape coeffs to 2D array
cf = coeff.reshape(dim[0],-1)
if dim_reorder:
# Coefficients are assumed (deg+1,nx,ny)
# xvals have length nz
# Result to be ordered (nx,ny,nz)
# Use np.matmul instead of np.dot for speed improvement
yfit = np.matmul(cf.T, xfan) # cf.T @ xfan
if ndim==1 or n==1:
yfit = yfit.ravel()
if ndim==3:
yfit = yfit.reshape((dim[1],dim[2],n))
else:
# This is the Python preferred ordering
# Coefficients are assumed (deg+1,ny,nx)
# xvals have length nz
# Result to be ordered (nz,ny,nx)
# Use np.matmul instead of np.dot for speed improvement
yfit = np.matmul(xfan.T, cf) # xfan.T @ cf
if ndim==1 or n==1:
yfit = yfit.ravel()
if ndim==3:
yfit = yfit.reshape((n,dim[1],dim[2]))
return yfit
[docs]def jl_poly_fit(x, yvals, deg=1, QR=True, robust_fit=False, niter=25, use_legendre=False, lxmap=None, **kwargs):
"""Fast polynomial fitting
Fit a polynomial to a function using linear least-squares.
This function is particularly useful if you have a data cube
and want to simultaneously fit a slope to all pixels in order
to produce a slope image.
Gives the option of performing QR decomposition, which provides
a considerable speed-up compared to simply using `np.linalg.lstsq()`.
In addition to being fast, it has better numerical stability than
linear regressions that involve matrix inversions (ie., `dot(x.T,x)`).
Returns the coefficients of the fit for each pixel.
Parameters
----------
x : ndarray
X-values of the data array (1D).
yvals : ndarray
Y-values (1D, 2D, or 3D) where the first dimension
must have equal length of x. For instance, if x is
a time series of a data cube with size NZ, then the
data cube must follow the Python convention (NZ,NY,NZ).
Keyword Args
------------
deg : int
Degree of polynomial to fit to the data.
QR : bool
Perform QR decomposition? Default=True.
robust_fit : bool
Perform robust fitting, iteratively kicking out
outliers until convergence.
niter : int
Maximum number of iterations for robust fitting.
If convergence is attained first, iterations will stop.
use_legendre : bool
Fit with Legendre polynomials, an orthonormal basis set.
lxmap : ndarray or None
Legendre polynomials are normally mapped to xvals of [-1,+1].
`lxmap` gives the option to supply the values for xval that
should get mapped to [-1,+1]. If set to None, then assumes
[xvals.min(),xvals.max()].
Example
-------
Fit all pixels in a data cube to get slope image in terms of ADU/sec
>>> nz, ny, nx = cube.shape
>>> tvals = (np.arange(nz) + 1) * 10.737
>>> coeff = jl_poly_fit(tvals, cube, deg=1)
>>> bias = coeff[0] # Bias image (y-intercept)
>>> slope = coeff[1] # Slope image (DN/sec)
"""
from .robust import medabsdev
orig_shape = yvals.shape
ndim = len(orig_shape)
cf_shape = list(yvals.shape)
cf_shape[0] = deg+1
if ndim==1:
assert len(x)==len(yvals), 'X and Y must have the same length'
else:
assert len(x)==orig_shape[0], 'X and Y.shape[0] must have the same length'
# Get different components to fit
if use_legendre:
# Values to map to [-1,+1]
if lxmap is None:
lxmap = [np.min(x), np.max(x)]
# Remap xvals -> lxvals
dx = lxmap[1] - lxmap[0]
lx = 2 * (x - (lxmap[0] + dx/2)) / dx
# Use Identity matrix to evaluate each polynomial component
# a = legendre.legval(lx, np.identity(deg+1))
# Below method is faster for large lxvals
a = np.asarray([eval_legendre(n, lx) for n in range(deg+1)])
else:
# Normalize x values to closer to 1 for numerical stability with large inputs
xnorm = np.mean(x)
x = x / xnorm
a = np.asarray([x**num for num in range(deg+1)], dtype='float')
b = yvals.reshape([orig_shape[0],-1])
# Fast method, but numerically unstable for overdetermined systems
#cov = np.linalg.pinv(np.matmul(a,a.T))
#coeff_all = np.matmul(cov,np.matmul(a,b))
if QR:
# Perform QR decomposition of the A matrix
q, r = np.linalg.qr(a.T, 'reduced')
# computing Q^T*b (project b onto the range of A)
# Use np.matmul instead of np.dot for speed improvement
qTb = np.matmul(q.T,b) # q.T @ b
# solving R*x = Q^T*b
coeff_all = np.linalg.lstsq(r, qTb, rcond=None)[0]
else:
coeff_all = np.linalg.lstsq(a.T, b, rcond=None)[0]
if robust_fit:
# Normally, we would weight both the x and y (ie., a and b) values
# then plug those into the lstsq() routine. However, this means we
# can no longer use the same x values for a series of y values. Each
# fit would have differently weight x-values, requiring us to fit
# each element individually, which would be very slow.
# Instead, we will compromise by "fixing" outliers in order to
# preserve the quickness of this routine. The fixed outliers become
# the new data that we refit.
close_factor = 0.03
close_enough = np.max([close_factor * np.sqrt(0.5/(x.size-1)), 1e-20])
err = 0
for i in range(niter):
# compute absolute value of residuals (fit minus data)
yvals_mod = jl_poly(x, coeff_all, use_legendre=use_legendre)
abs_resid = np.abs(yvals_mod - b)
# compute the scaling factor for the standardization of residuals
# using the median absolute deviation of the residuals
# 6.9460 is a tuning constant (4.685/0.6745)
abs_res_scale = 6.9460 * np.median(abs_resid, axis=0)
# standardize residuals
w = abs_resid / abs_res_scale.reshape([1,-1])
# exclude outliers
outliers = w>1
# Create a version with outliers fixed
# Se
yvals_fix = b.copy()
yvals_fix[outliers] = yvals_mod[outliers]
# Ignore fits with no outliers
ind_fit = outliers.sum(axis=0) > 0
if ind_fit[ind_fit].size == 0: break
if QR:
# Use np.matmul instead of np.dot for speed improvement
qTb = np.matmul(q.T,yvals_fix[:,ind_fit]) # q.T @ yvals_fix[:,ind_fit]
coeff_all[:,ind_fit] = np.linalg.lstsq(r, qTb, rcond=None)[0]
else:
coeff_all[:,ind_fit] = np.linalg.lstsq(a.T, yvals_fix[:,ind_fit], rcond=None)[0]
prev_err = medabsdev(abs_resid, axis=0) if i==0 else err
err = medabsdev(abs_resid, axis=0)
diff = np.abs((prev_err - err)/err)
#print(coeff_all.mean(axis=1), coeff_all.std(axis=1), np.nanmax(diff), ind_fit[ind_fit].size)
if 0 < np.nanmax(diff) < close_enough: break
if not use_legendre:
parr = np.arange(deg+1, dtype='float')
coeff_all = coeff_all / (xnorm**parr.reshape([-1,1]))
return coeff_all.reshape(cf_shape)
###########################################################################
# Binning and stats
###########################################################################
[docs]def hist_indices(values, bins=10, return_more=False):
"""Histogram indices
This function bins an input of values and returns the indices for
each bin. This is similar to the reverse indices functionality
of the IDL histogram routine. It's also much faster than doing
a for loop and creating masks/indices at each iteration, because
we utilize a sparse matrix constructor.
Returns a list of indices grouped together according to the bin.
Only works for evenly spaced bins.
Parameters
----------
values : ndarray
Input numpy array. Should be a single dimension.
bins : int or ndarray
If bins is an int, it defines the number of equal-width bins
in the given range (10, by default). If bins is a sequence,
it defines the bin edges, including the rightmost edge.
In the latter case, the bins must encompass all values.
return_more : bool
Option to also return the values organized by bin and
the value of the centers (igroups, vgroups, center_vals).
Example
-------
Find the standard deviation at each radius of an image
>>> rho = dist_image(image)
>>> binsize = 1
>>> bins = np.arange(rho.min(), rho.max() + binsize, binsize)
>>> igroups, vgroups, center_vals = hist_indices(rho, bins, True)
>>> # Get the standard deviation of each bin in image
>>> std = binned_statistic(igroups, image, func=np.std)
"""
from scipy.sparse import csr_matrix
values_flat = values.ravel()
vmin = values_flat.min()
vmax = values_flat.max()
N = len(values_flat)
try: # assume it's already an array
binsize = bins[1] - bins[0]
except TypeError: # if bins is an integer
binsize = (vmax - vmin) / bins
bins = np.arange(vmin, vmax + binsize, binsize)
bins[0] = vmin
bins[-1] = vmax
# Central value of each bin
center_vals = bins[:-1] + binsize / 2.
nbins = center_vals.size
# TODO: If input bins is an array that doesn't span the full set of input values,
# then we need to set a warning.
if (vmin<bins[0]) or (vmax>bins[-1]):
raise ValueError("Bins must encompass entire set of input values.")
digitized = ((nbins-1.0) / (vmax-vmin) * (values_flat-vmin)).astype(np.int)
csr = csr_matrix((values_flat, [digitized, np.arange(N)]), shape=(nbins, N))
# Split indices into their bin groups
igroups = np.split(csr.indices, csr.indptr[1:-1])
if return_more:
vgroups = np.split(csr.data, csr.indptr[1:-1])
return (igroups, vgroups, center_vals)
else:
return igroups
[docs]def binned_statistic(x, values, func=np.mean, bins=10):
"""Binned statistic
Compute a binned statistic for a set of data. Drop-in replacement
for scipy.stats.binned_statistic.
Parameters
----------
x : ndarray
A sequence of values to be binned. Or a list of binned
indices from hist_indices().
values : ndarray
The values on which the statistic will be computed.
func : func
The function to use for calculating the statistic.
bins : int or ndarray
If bins is an int, it defines the number of equal-width bins
in the given range (10, by default). If bins is a sequence,
it defines the bin edges, including the rightmost edge.
This doesn't do anything if x is a list of indices.
Example
-------
Find the standard deviation at each radius of an image
>>> rho = dist_image(image)
>>> binsize = 1
>>> radial_bins = np.arange(rho.min(), rho.max() + binsize, binsize)
>>> radial_stds = binned_statistic(rho, image, func=np.std, bins=radial_bins)
"""
values_flat = values.ravel()
try: # This will be successful if x is not already a list of indices
# Check if bins is a single value
if (len(np.array(bins))==1) and (bins is not None):
igroups = hist_indices(x, bins=bins, return_more=False)
res = np.array([func(values_flat[ind]) for ind in igroups])
# Otherwise we assume bins is a list or array defining edge locations
else:
bins = np.array(bins)
# Check if binsize is the same for all bins
bsize = bins[1:] - bins[:-1]
# Make sure bins encompass full set of input values
ind_bin = (x>=bins.min()) & (x<=bins.max())
x = x[ind_bin]
values_flat = values_flat[ind_bin]
if np.isclose(bsize.min(), bsize.max()):
igroups = hist_indices(x, bins=bins, return_more=False)
res = np.array([func(values_flat[ind]) for ind in igroups])
else:
# If non-uniform bins, just use scipy.stats.binned_statistic
from scipy import stats
res, _, _ = stats.binned_statistic(x, values, func, bins)
except:
# Assume that input is a list of indices
igroups = x
res = np.array([func(values_flat[ind]) for ind in igroups])
return res
[docs]def radial_std(im_diff, pixscale=None, oversample=None, supersample=False, func=np.std):
"""Generate contrast curve of PSF difference
Find the standard deviation within fixed radial bins of a differenced image.
Returns two arrays representing the 1-sigma contrast curve at given distances.
Parameters
==========
im_diff : ndarray
Differenced image of two PSFs, for instance.
Keywords
========
pixscale : float
Pixel scale of the input image
oversample : int
Is the input image oversampled compared to detector? If set, then
the binsize will be pixscale*oversample (if supersample=False).
supersample : bool
If set, then oversampled data will have a binsize of pixscale,
otherwise the binsize is pixscale*oversample.
func_std : func
The function to use for calculating the radial standard deviation.
"""
from astropy.convolution import convolve, Gaussian1DKernel
# Set oversample to 1 if supersample keyword is set
oversample = 1 if supersample or (oversample is None) else oversample
# Rebin data
data_rebin = frebin(im_diff, scale=1/oversample)
# Determine pixel scale of rebinned data
pixscale = 1 if pixscale is None else oversample*pixscale
# Pixel distances
rho = dist_image(data_rebin, pixscale=pixscale)
# Get radial profiles
binsize = pixscale
bins = np.arange(rho.min(), rho.max() + binsize, binsize)
nan_mask = np.isnan(data_rebin)
igroups, _, rr = hist_indices(rho[~nan_mask], bins, True)
stds = binned_statistic(igroups, data_rebin[~nan_mask], func=func)
stds = convolve(stds, Gaussian1DKernel(1))
# Ignore corner regions
arr_size = np.min(data_rebin.shape) * pixscale
mask = rr < (arr_size/2)
return rr[mask], stds[mask]
[docs]def find_closest(A, B):
""" Find closest indices
Given two arrays, A and B, find the indices in B whose values
are closest to those in A. Returns an array with size equal to A.
This is much much faster than something like, especially for large arrays:
`np.argmin(np.abs(A - B[:, np.newaxis]), axis=0)`
"""
# Make sure these are array
a = np.asarray(A)
if np.size(B)==1:
b = np.asarray([B])
else:
b = np.asarray(B)
# Flatten a array
a_shape = a.shape
if len(a_shape)>1:
a = a.flatten()
# b needs to be sorted
isort = np.argsort(B)
b = b[isort]
# Find indices of
arghigh = np.searchsorted(b,a)
arglow = np.maximum(arghigh-1,0)
arghigh = np.minimum(arghigh,len(b)-1)
# Look at deltas and choose closest
delta_high = np.abs(b[arghigh]-a)
delta_low = np.abs(b[arglow]-a)
closest_arg = np.where(delta_high>delta_low,arglow,arghigh)
return isort[closest_arg].reshape(a_shape)
[docs]def fit_bootstrap(pinit, datax, datay, function, yerr_systematic=0.0, nrand=1000, return_more=False):
"""Bootstrap fitting routine
Bootstrap fitting algorithm to determine the uncertainties on the fit parameters.
Parameters
----------
pinit : ndarray
Initial guess for parameters to fit
datax, datay : ndarray
X and Y values of data to be fit
function : func
Model function
yerr_systematic : float or array_like of floats
Systematic uncertainites contributing to additional error in data.
This is treated as independent Normal error on each data point.
Can have unique values for each data point. If 0, then we just use
the standard deviation of the residuals to randomize the data.
nrand : int
Number of random data sets to generate and fit.
return_more : bool
If true, then also return the full set of fit parameters for the randomized
data to perform a more thorough analysis of the distribution. Otherewise,
just reaturn the mean and standard deviations.
"""
from scipy import optimize
def errfunc(p, x, y):
return function(x, p) - y
# Fit first time
pfit, perr = optimize.leastsq(errfunc, pinit, args=(datax, datay), full_output=0)
# Get the stdev of the residuals
residuals = errfunc(pfit, datax, datay)
sigma_res = np.std(residuals)
sigma_err_total = np.sqrt(sigma_res**2 + yerr_systematic**2)
# Some random data sets are generated and fitted
randomdataY = datay + np.random.normal(scale=sigma_err_total, size=(nrand, len(datay)))
ps = []
for i in range(nrand):
# randomDelta = np.random.normal(0., sigma_err_total, len(datay))
# datay_rand = datay + randomDelta
datay_rand = randomdataY[i]
randomfit, randomcov = optimize.leastsq(errfunc, pinit, args=(datax, datay_rand), full_output=0)
ps.append(randomfit)
ps = np.array(ps)
mean_pfit = np.mean(ps,axis=0)
err_pfit = np.std(ps,axis=0)
if return_more:
return mean_pfit, err_pfit, ps
else:
return mean_pfit, err_pfit