Source code for pynrc.maths.image_manip

import numpy as np
import logging
_log = logging.getLogger('pynrc')

from poppy.utils import krebin

from .coords import dist_image
from scipy.ndimage import fourier_shift, rotate
from astropy.io import fits

from webbpsf_ext.image_manip import pad_or_cut_to_size
from webbpsf_ext.image_manip import fshift, fourier_imshift, cv_shift
from webbpsf_ext.image_manip import frebin, zrebin
from webbpsf_ext.image_manip import fractional_image_shift, image_shift_with_nans, replace_nans
from webbpsf_ext.image_manip import rotate_offset, rotate_shift_image
from webbpsf_ext.image_manip import image_rescale, model_to_hdulist
from webbpsf_ext.image_manip import convolve_image, crop_zero_rows_cols
from webbpsf_ext.image_manip import get_im_cen
from webbpsf_ext.image_manip import add_ipc, add_ppc, apply_pixel_diffusion
from webbpsf_ext.image_manip import image_convolution
from webbpsf_ext.imreg_tools import subtract_psf
from webbpsf_ext.maths import hist_indices, binned_statistic, fit_bootstrap

from webbpsf_ext.image_manip import crop_observation as _crop_observation

[docs] def crop_observation(im_full, ap, xysub, xyloc=None, delx=0, dely=0, shift_func=fourier_imshift, interp='cubic', return_xy=False, fill_val=0, **kwargs): """Crop around aperture reference location `xysub` specifies the desired crop size. if `xysub` is an array, dimension order should be [nysub,nxsub] `xyloc` provides a way to manually supply the central position. Set `ap` to None will crop around `xyloc` or center of array. delx and delx will shift array by some offset before cropping to allow for sub-pixel shifting. To change integer crop positions, recommend using `xyloc` instead. Shift function can be fourier_imshfit, fshift, or cv_shift. The interp keyword only works for the latter two options. Consider 'lanczos' for cv_shift. Setting `return_xy` to True will also return the indices used to perform the crop. Parameters ---------- im_full : ndarray Input image. ap : pysiaf aperture Aperture to use for cropping. Will crop around the aperture reference point by default. Will be overridden by `xyloc`. xysub : int, tuple, or list Size of subarray to extract. If a single integer is provided, then a square subarray is extracted. If a tuple or list is provided, then it should be of the form (ny, nx). xyloc : tuple or list (x,y) pixel location around which to crop the image. If None, then the image aperture refernece point is used. Keyword Args ------------ delx : int or float Integer pixel offset in x-direction. This shifts the image by some number of pixels in the x-direction. Positive values shift the image to the right. dely : int or float Integer pixel offset in y-direction. This shifts the image by some number of pixels in the y-direction. Positive values shift the image up. shift_func : function Function to use for shifting. Default is `fourier_imshift`. If delx and dely are both integers, then `fshift` is used. interp : str Interpolation method to use for shifting. Default is 'cubic'. Options are 'nearest', 'linear', 'cubic', and 'quadratic' for `fshift`. return_xy : bool If True, then return the x and y indices used to crop the image prior to any shifting from `delx` and `dely`. Default is False. fill_val : float Value to use for filling in the empty pixels after shifting. Default = 0. """ return _crop_observation(im_full, ap, xysub, xyloc=xyloc, delx=delx, dely=dely, shift_func=shift_func, interp=interp, return_xy=return_xy, fill_val=fill_val, **kwargs)
[docs] def crop_image(imarr, xysub, xyloc=None, **kwargs): """Crop input image around center using integer offsets only If size is exceeded, then the image is expanded and filled with 0s by default. Parameters ---------- imarr : ndarray Input image or image cube [nz,ny,nx]. xysub : int, tuple, or list Size of subarray to extract. If a single integer is provided, then a square subarray is extracted. If a tuple or list is provided, then it should be of the form (ny, nx). xyloc : tuple or list (x,y) pixel location around which to crop the image. If None, then the image center is used. Keyword Args ------------ delx : int or float Integer pixel offset in x-direction. This shifts the image by some number of pixels in the x-direction. Positive values shift the image to the right. dely : int or float Integer pixel offset in y-direction. This shifts the image by some number of pixels in the y-direction. Positive values shift the image up. shift_func : function Function to use for shifting. Default is `fourier_imshift`. If delx and dely are both integers, then `fshift` is used. interp : str Interpolation method to use for shifting. Default is 'cubic'. Options are 'nearest', 'linear', 'cubic', and 'quadratic' for `fshift`. return_xy : bool If True, then return the x and y indices used to crop the image prior to any shifting from `delx` and `dely`; (x1, x2, y1, y2). Default is False. fill_val : float Value to use for filling in the empty pixels after shifting. Default = 0. """ sh = imarr.shape if len(sh) == 2: return crop_observation(imarr, None, xysub, xyloc=xyloc, **kwargs) elif len(sh) == 3: return_xy = kwargs.pop('return_xy', False) res = np.asarray([crop_observation(im, None, xysub, xyloc=xyloc, **kwargs) for im in imarr]) if return_xy: _, xy = crop_observation(imarr[0], None, xysub, xyloc=xyloc, return_xy=True, **kwargs) return (res, xy) else: return res else: raise ValueError(f'Found {len(sh)} dimensions {sh}. Only 2 or 3 dimensions allowed.')
[docs] def shift_subtract(params, reference, target, mask=None, pad=False, interp='cubic', shift_function=fshift): """Shift and subtract image Subpixel shifts for input into least-square optimizer. Parameters ---------- params : tuple xshift, yshift, beta reference : ndarray See align_fourierLSQ target : ndarray See align_fourierLSQ mask : ndarray, optional See align_fourierLSQ pad : bool Should we pad the array before shifting, then truncate? Otherwise, the image is wrapped. interp : str Interpolation for fshift function. Default is 'cubic'. Options are 'linear', 'cubic', or 'quintic'. shift_function : func which function to use for sub-pixel shifting Returns ------- ndarray 1D array of target-reference residual after applying shift and intensity fraction. """ xshift, yshift, beta = params if shift_function is not None: offset = shift_function(reference, xshift, yshift, pad=pad, interp=interp) else: offset = reference if mask is not None: return ( (target - beta * offset) * mask ).ravel() #.flatten() else: return ( target - beta * offset ).ravel() #.flatten()
[docs] def align_leastsq(image, psf_over, osamp=1, bpmask=None, weights=None, params0=[0.0,0.0,1.0,0.0], kipc=None, func_shift=fourier_imshift, interp='cubic', pad=True, **kwargs): """Find best shift value LSQ optimization with option of shift alignment algorithm. In practice, the 'reference' image gets shifted to match the 'target' image. Parameters ---------- image : ndarray Observed science image psf_over : ndarray Input oversampled PSF to fit and align Keyword Args ------------ osamp : int Oversampling factor of PSF bpmask : ndarray, None Bad pixel mask indicating what pixels in input image to ignore. weights : ndarray, None Array of weights to use during the fitting process. params0 : list Initial guess for (x, y, offset) values. Optimal scaling factor is calculated automatically in `subtract_psf`. func_shift : func Which function to use for sub-pixel shifting. Options are fourier_imshift, fshift, or cv_shift. interp : str Interpolation for fshift or cv_shift functions. Options are 'linear', 'cubic', or 'quintic'. Default is 'cubic'. pad : bool Should we pad the array before shifting, then truncate? Otherwise, the image is wrapped. Returns ------- list (x, y, scale, offset) values from LSQ optimization, where (x, y) are the misalignment of target from reference and scale is the fraction by which the target intensity must be reduced to match the intensity of the reference. Offset gives the difference in the mean intensity of the two images. """ from scipy.optimize import least_squares#, leastsq def psf_diff(params, image, psf, **kwargs): """PSF differencing helper""" kwargs['xyshift'] = params[0:2] if len(params)==1: kwargs['xyshift'] = (0,0) psf_offset = 0.0 elif len(params)==2: psf_offset = 0.0 elif len(params)==3: psf_offset = params[-1] else: raise ValueError("params must be length 2, 3, or 4") kwargs['psf_offset'] = psf_offset return subtract_psf(image, psf, **kwargs).ravel() # Set weights image to pass to differencing function if bpmask is not None: weights = np.ones_like(image) if weights is None else weights weights[bpmask] = 0 # Keywords to pass kwargs2 = { 'weights' : weights, 'osamp' : osamp, 'func_shift' : func_shift, 'interp' : interp, 'pad' : pad, 'kipc' : kipc, } kwargs_pass = kwargs.copy() kwargs_pass.update(kwargs2) # Use loss='soft_l1' for least squares robust against outliers # May want to play around with f_scale... res = least_squares(psf_diff, params0, #diff_step=0.1, loss='soft_l1', f_scale=1.0, args=(image, psf_over), kwargs=kwargs_pass, **kwargs) out = res.x if len(out)==1: return out[0] else: return out
[docs] def align_LSQ(reference, target, mask=None, pad=False, interp='cubic', shift_function=fshift, init_pars=[0.0, 0.0, 1.0]): """Find best shift value LSQ optimization with option of shift alignment algorithm. In practice, the 'reference' image gets shifted to match the 'target' image. Parameters ---------- reference : ndarray N x K image to be aligned to target : ndarray N x K image to align to reference mask : ndarray, optional N x K image indicating pixels to ignore when performing the minimization. The masks acts as a weighting function in performing the fit. pad : bool Should we pad the array before shifting, then truncate? Otherwise, the image is wrapped. interp : str Interpolation for fshift function. Default is 'cubic'. Options are 'linear', 'cubic', or 'quintic'. shift_function : func which function to use for sub-pixel shifting. Options are fourier_imshift or fshift. fshift tends to be 3-5 times faster for similar results. Returns ------- list (x, y, beta) values from LSQ optimization, where (x, y) are the misalignment of target from reference and beta is the fraction by which the target intensity must be reduced to match the intensity of the reference. """ from scipy.optimize import least_squares#, leastsq # Use loss='soft_l1' for least squares robust against outliers # May want to play around with f_scale... res = least_squares(shift_subtract, init_pars, diff_step=0.1, loss='soft_l1', f_scale=1.0, args=(reference,target), kwargs={'mask':mask,'pad':pad,'shift_function':shift_function,'interp':interp}) out = res.x #out,_ = leastsq(shift_subtract, init_pars, # args=(reference,target,mask,pad,shift_function)) #results = [out[0],out[1],out[2]] #x,y,beta return out
# Fix NaN values
[docs] def fix_nans_with_med(im, niter_max=5, verbose=False, **kwargs): """Iteratively fix NaNs with surrounding Real data""" sh_orig = im.shape nan_mask = np.isnan(im) n_nans = np.where(nan_mask)[0].size if verbose: print('{} NaNs to start'.format(n_nans)) for ii in range(niter_max): im = im.flatten() nan_mask = np.isnan(im) im = im.reshape(sh_orig) # Return if we no NaNs if not np.any(nan_mask): return im if verbose: print('Iter {}'.format(ii)) # Shift im_smth = [] for i in np.arange(-1,2): for j in np.arange(-1,2): im_smth.append(fshift(im, i, j)) im_smth = np.array(im_smth) # Flatten arrays for indexing of NaNs im_smth = im_smth.reshape([im_smth.shape[0],-1]) im = im.flatten() # Take median of only the NaN'ed pixels im[nan_mask] = np.nanmedian(im_smth[:,nan_mask], axis=0) im = im.reshape(sh_orig) nan_mask = np.isnan(im) if np.any(nan_mask): n_nans = np.where(nan_mask)[0].size if verbose: print('{} NaNs left after {} iterations.'.format(n_nans, niter_max)) return im
[docs] def scale_ref_image(im1, im2, mask=None, smooth_imgs=False, return_shift_values=False): """Reference image scaling Find value to scale a reference image by minimizing residuals. Assumes everything is already aligned if return_shift_values=False. Or simply turn on return_shift_values to return (dx,dy,scl). Then fshift(im2,dx,dy) to shift the reference image. Parameters ---------- im1 : ndarray Science star observation. im2 : ndarray Reference star observation. mask : bool array or None Use this mask to exclude pixels. Boolean mask where True is included and False is excluded. smooth_imgs : bool Smooth the images with nearest neighbors to remove bad pixels? return_shift_values : bool Option to return x and y shift values """ # Mask for generating standard deviation if mask is None: mask = np.ones(im1.shape, dtype=bool) nan_mask = ~(np.isnan(im1) | np.isnan(im2)) mask = (mask & nan_mask) # Spatial averaging to remove bad pixels if smooth_imgs: im1_smth = []#np.zeros_like(im1) im2_smth = []#np.zeros_like(im2) for i in [-1,0,1]: for j in [-1,0,1]: im1_smth.append(fshift(im1, i, j)) im2_smth.append(fshift(im2, i, j)) im1 = np.nanmedian(np.array(im1_smth), axis=0) im2 = np.nanmedian(np.array(im2_smth), axis=0) scale_init = np.nanmedian(im1[mask]) / np.nanmedian(im2[mask]) if return_shift_values: params = [0.0, 0.0, scale_init] else: params = [scale_init] # Perform linear least squares fit on difference function return align_leastsq(im1, im2, bpmask=~mask, weights=None, params0=params, func_shift=fshift, interp='linear')
# if return_shift_values: # return align_LSQ(im2[mask], im1[mask], shift_function=fshift) # else: # _, _, scl = align_LSQ(im2[mask], im1[mask], shift_function=None) # return scl # ind = np.where(im1==im1[mask].max()) # ind = [ind[0][0], ind[1][0]] # # Initial Guess # scl = np.nanmean(im1[ind[0]-3:ind[0]+3,ind[1]-3:ind[1]+3]) / \ # np.nanmean(im2[ind[0]-3:ind[0]+3,ind[1]-3:ind[1]+3]) # # Wider range # # Check a range of scale values # # Want to minimize the standard deviation of the differenced images # scl_arr = np.linspace(0.2*scl,2*scl,10) # mad_arr = [] # for val in scl_arr: # diff = im1 - val*im2 # mad_arr.append(robust.medabsdev(diff[mask])) # mad_arr = np.array(mad_arr) # scl = scl_arr[mad_arr==mad_arr.min()][0] # # Check a range of scale values # # Want to minimize the standard deviation of the differenced images # scl_arr = np.linspace(0.85*scl,1.15*scl,50) # mad_arr = [] # for val in scl_arr: # diff = im1 - val*im2 # mad_arr.append(robust.medabsdev(diff[mask])) # mad_arr = np.array(mad_arr) # #plt.plot(scl_arr,mad_arr) # return scl_arr[mad_arr==mad_arr.min()][0]
[docs] def optimal_difference(im_sci, im_ref, scale, binsize=1, center=None, mask_good=None, sub_mean=True, std_func=np.std): """Optimize subtraction of ref PSF Scale factors from scale_ref_image work great for subtracting a reference PSF from a science image where there are plenty of photons, but perform poorly in the noise-limited regime. If we simply perform a difference by scaling the reference image, then we also amplify the noise. In the background, it's better to simply subtract the unscaled reference pixels. This routine finds the radial cut-off of the dominant noise source. Parameters ---------- im_sci : ndarray Science star observation. im_ref : ndarray Reference star observation. scale : float Scale factor from :func:`scale_ref_image` binsize : int Radial binsize (in pixels) to perform calculations center : tuple or None Location (x,y) to calculate radial distances. Default is center of image. mask_good : bool array Only perform operations on pixels where mask_good=True. sub_mean : bool Subtract mean (median, actually) of pixels in each radial bin? Basically a background subtraction. std_func : func What function do we want to use for calculating the standard deviation in each radial bin? After comparing the standard deviation between the two scaled differences in each radial bin, we only keep the better of the two. """ diff1 = im_sci - im_ref diff2 = im_sci - im_ref * scale rho = dist_image(im_sci, center=center) # Only perform operations on pixels where mask_good=True if mask_good is None: mask_good = np.ones(rho.shape, dtype=bool) nan_mask1 = np.isnan(diff1) nan_mask2 = np.isnan(diff2) mask_good = mask_good & (~nan_mask1) & (~nan_mask2) rho_good = rho[mask_good] diff1_good = diff1[mask_good] diff2_good = diff2[mask_good] # Get the histogram indices bins = np.arange(rho_good.min(), rho_good.max() + binsize, binsize) igroups = hist_indices(rho_good, bins) nbins = len(igroups) # Standard deviation for each bin std1 = binned_statistic(igroups, diff1_good, func=std_func) std2 = binned_statistic(igroups, diff2_good, func=std_func) # Subtract the mean at each radius if sub_mean: med1 = binned_statistic(igroups, diff1_good, func=np.median) med2 = binned_statistic(igroups, diff2_good, func=np.median) for i in range(nbins): diff1_good[igroups[i]] -= med1[i] diff2_good[igroups[i]] -= med2[i] # Replace values in diff1 with better ones in diff2 ibin_better = np.where(std2 < std1)[0] for ibin in ibin_better: diff1_good[igroups[ibin]] = diff2_good[igroups[ibin]] diff1[mask_good] = diff1_good return diff1