Source code for parts.retrieval.a_priori

"""
parts.retrieval.a_priori
------------------------

The :code:`retrieval.a_priori` sub-module provides modular data provider
object that can be used to build a priori data providers.
"""
from parts.data_provider import DataProviderBase
from parts.sensor import ActiveSensor, PassiveSensor
import numpy as np
import scipy as sp
import scipy.sparse


################################################################################
# Covariances
################################################################################

class Diagonal:
    def __init__(self,
                 diagonal,
                 mask = None,
                 mask_value = 1e-12):
        self.diagonal = np.array(diagonal)
        self.mask = mask
        self.mask_value = mask_value

    def get_covariance(self, data_provider, *args, **kwargs):

        if not self.mask is None:
            mask = np.logical_not(self.mask(data_provider, *args, **kwargs))
        else:
            mask = []

        if self.diagonal.size == 1:
            z = data_provider.get_altitude(*args, **kwargs)
            diagonal = self.diagonal * np.ones(z.size)
        else:
            diagonal = self.diagonal

        diagonal[mask] = self.mask_value
        diagonal = sp.sparse.diags(diagonal, format = "coo")
        return diagonal

[docs]class SpatialCorrelation: """ Adds spatial correlation to a given covariance matrix. """ def __init__(self, covariance, correlation_length, correlation_type = "exp", cutoff = 1e-2): """ Arguments: covariance: Covariance object providing the original covariance matrix to which to apply the spatial correlation. correlation_length(:code:`float`): Correlation length in meters. correlation_type(:code:`str`): Type of the correlation to apply. cutoff(:code:`float`): Threshold below which to set correlation coefficients to zero. """ self.covariance = covariance self.correlation_length = correlation_length self.correlation_type = correlation_type self.cutoff = cutoff def get_covariance(self, data_provider, *args, **kwargs): z = data_provider.get_altitude(*args, **kwargs) dz = np.abs(z.reshape(-1, 1) - z.reshape(1, -1)) if self.correlation_type == "exp": corr = np.exp(- np.abs(dz / self.correlation_length) ) elif self.correlation_type == "gauss": corr = np.exp(- (dz / self.correlation_length) ** 2) inds = corr < self.cutoff corr[inds] = 0.0 covmat = self.covariance.get_covariance(data_provider, *args, **kwargs) if isinstance(covmat, sp.sparse.spmatrix): covmat = covmat.todense() return corr @ covmat
[docs]class Thikhonov: """ Thikhonov regularization using second order finite differences. """ def __init__(self, scaling = 1.0, diagonal = 0.0, mask = None, mask_value = 1e12, z_scaling = True): """ Arguments: scaling(:code:`np.float`): Scalar to scale the precision matrix with. mask: A mask object defining indices on the diagonal on the precision matrix to which to as :code:`mask_value`. mask_value(:code:`float`): Scalar to add to the diagonal of the precision matrix on locations defined by :code:`mask`. z_scaling(:code:`Bool`): Whether or not to scale matrix coefficients according to height differences between levels. """ self.scaling = scaling self.diagonal = diagonal self.mask = mask self.mask_value = mask_value self.z_scaling = z_scaling def get_covariance(self, data_provider, *args, **kwargs): precmat = self.get_precision(data_provider, *args, **kwargs) diag = precmat.diagonal() return sp.sparse.diags(1.0 / diag, format = "coo") def get_precision(self, data_provider, *args, **kwargs): z = data_provider.get_altitude(*args, **kwargs) n = z.size du2 = np.ones(n - 2) du1 = -4.0 * np.ones(n - 1) du1[0] = -2.0 du1[-1] = -2.0 dl1 = np.copy(du1) dl2 = np.copy(du2) d = 6.0 * np.ones(n) d[:2] = [1, 5] d[-2:] = [5, 1] if self.diagonal > 0.0: d += self.diagonal if not self.mask is None: mask = np.logical_not(self.mask(data_provider, *args, **kwargs)) du1[mask[:-1]] = 0 du2[mask[:-2]] = 0 dl1[mask[1:]] = 0 dl2[mask[2:]] = 0 d[mask] = self.mask_value precmat = sp.sparse.diags(diagonals = [du2, du1, d, dl1, dl2], offsets = [2, 1, 0, -1, -2], format = "coo") precmat *= self.scaling if self.z_scaling: z = data_provider.get_altitude(*args, **kwargs) zf = (np.diff(z) / np.diff(z).mean()) ** 2.0 zf1 = np.zeros(z.shape) zf1[1:] += zf zf1[:-1] += zf zf1[1:-1] *= 0.5 precmat = sp.sparse.diags(diagonals = [zf1], offsets = [0], format = "coo") * precmat return precmat.tocoo()
################################################################################ # APrioriProviderBase ################################################################################
[docs]class APrioriProviderBase(DataProviderBase): def __init__(self, name, covariance): """ Create :class:`DataProviderApriori` instance that will provide the value of the quantity :code:`name` from the owning data provider as a priori mean state. Arguments: name(:code:`name`): Name of the quantity that this a priori provider should propagate. covariance(:code:`numpy.array` or :code:`float`): Diagonal or full covariance matrix to be provided as covariance matrix for the retrieval quantity. spatial_correlation: Functor that is applied to the covariance matrix before this is returned. """ super().__init__() xa_name = "get_" + name + "_xa" self.__dict__[xa_name] = self.get_xa if hasattr(covariance, "get_covariance"): covariance_name = "get_" + name + "_covariance" self.__dict__[covariance_name] = self.get_covariance if hasattr(covariance, "get_precision"): precision_name = "get_" + name + "_precision" self.__dict__[precision_name] = self.get_precision self.name = name self._covariance = covariance def get_covariance(self, *args, **kwargs): return self._covariance.get_covariance(self.owner, *args, **kwargs) def get_precision(self, *args, **kwargs): return self._covariance.get_precision(self.owner, *args, **kwargs)
################################################################################ # Masks ################################################################################
[docs]class And: """ Creates a combined mask by applying logical and to a list of single masks. """ def __init__(self, *args): self.masks = list(args) def __call__(self, data_provider, *args, **kwargs): """ Arguments: data_provider: Data provider describing the atmospheric scenario. *args: Arguments to forward to data provider. **kwargs: Keyword arguments to forward to data_provider. """ masks = [] for m in self.masks: masks += [m(data_provider, *args, **kwargs)] m_and = masks[0] for m in masks[1:]: m_and = np.logical_and(m_and, m) return m_and
[docs]class TropopauseMask: """ Returns a mask that is true only below the approximate height of the troposphere. The troposphere is detected as the first grid point where the lapse rate is negative and the temperature below 220. """ def __init__(self): pass def __call__(self, data_provider, *args, **kwargs): """ Arguments: data_provider: Data provider describing the atmospheric scenario. *args: Arguments to forward to data provider. **kwargs: Keyword arguments to forward to data_provider. """ t = data_provider.get_temperature(*args, **kwargs) t_avg = 0.5 * (t[1:] + t[:-1]) lr = - np.diff(t) tp = np.where(np.logical_and(lr < 0, t_avg < 220))[0] inds = np.ones(t.size, dtype = np.bool) if len(tp > 0): i = np.where(np.logical_and(lr < 0, t_avg < 220))[0][0] inds[i : inds.size] = False return inds
[docs]class TemperatureMask: """ The temperature mask replaces values at grid points outside of the given temperature interval with another value. """ def __init__(self, lower_limit, upper_limit): """ Arguments: lower_limit(:code:`float`): The lower temperature limit upper_limit(:code:`float`): The upper temperature limit """ self.lower_limit = lower_limit self.upper_limit = upper_limit def __call__(self, data_provider, *args, **kwargs): t = data_provider.get_temperature(*args, **kwargs) inds = np.logical_and(t.ravel() >= self.lower_limit, t.ravel() < self.upper_limit) return inds
[docs]class AltitudeMask: """ The altitude mask replaces values at grid points outside of the given altitude interval with another value. """ def __init__(self, lower_limit, upper_limit): """ Arguments: lower_limit(:code:`float`): The lower altitude limit upper_limit(:code:`float`): The upper altitude limit """ self.lower_limit = lower_limit self.upper_limit = upper_limit def __call__(self, data_provider, *args, **kwargs): z = data_provider.get_altitude(*args, **kwargs) inds = np.logical_and(z.ravel() >= self.lower_limit, z.ravel() < self.upper_limit) return inds
################################################################################ # A priori providers ################################################################################
[docs]class DataProviderAPriori(APrioriProviderBase): """ A priori provider that propagates an atmospheric quantity :code:`name` as a priori mean profile from the data provider. """ def __init__(self, name, covariance): """ Create :class:`DataProviderApriori` instance that will provide the value of the quantity :code:`name` from the owning data provider as a priori mean state. Arguments: name(:code:`name`): Name of the quantity that this a priori provider should propagate. covariance(:code:`numpy.array` or :code:`float`): Diagonal or full covariance matrix to be provided as covariance matrix for the retrieval quantity. spatial_correlation: Functor that is applied to the covariance matrix before this is returned. """ super().__init__(name, covariance) def get_xa(self, *args, **kwargs): f_name = "get_" + self.name try: f = getattr(self.owner, f_name) except: raise Expetion("DataProviderApriori instance requires get method " " {0} from its owning data provider.") x = f(*args, **kwargs) return x
[docs]class FixedAPriori(APrioriProviderBase): """ Returns an a priori profile that does not depend on the atmospheric state. """ def __init__(self, name, xa, covariance, mask = None, mask_value = 1e-12): super().__init__(name, covariance) self._xa = np.array(xa) self.mask = mask self.mask_value = mask_value def get_xa(self, *args, **kwargs): if self._xa.size == 1: z = self.owner.get_altitude(*args, **kwargs) xa = self._xa.ravel() * np.ones(z.size) else: xa = self._xa if not self.mask is None: mask = np.logical_not(self.mask(self.owner, *args, **kwargs)) xa[mask] = self.mask_value return xa
################################################################################ # Functional a priori ################################################################################
[docs]class FunctionalAPriori(APrioriProviderBase): """ Returns an a priori profile that does not depend on the atmospheric state. """ def __init__(self, name, variable, f, covariance, mask = None, mask_value = 1e-12): super().__init__(name, covariance) self.variable = variable self.f = f self.mask = mask self.mask_value = mask_value def get_xa(self, *args, **kwargs): try: f_get = getattr(self.owner, "get_" + self.variable) x = f_get(*args, **kwargs) except: raise Exception("Could not get variable {} from data provider." .format(self.variable)) xa = self.f(x) if not self.mask is None: mask = np.logical_not(self.mask(self.owner, *args, **kwargs)) xa[mask] = self.mask_value return xa
################################################################################ # Sensor a priori ################################################################################
[docs]class SensorNoiseAPriori(DataProviderBase): """ Measurement error due to sensor noise. The :code:`SensorNoiseAPriori` class constructs a combined observation error covariance matrix from the noise characteristics of a list of sensors. The noise of particular sensors can be amplified by adding the scaling factor to the :code:`noise_scaling` attribute of the class. Attributes: noise_scaling(:code:`dict`): Dictionary mapping sensor names to noise scaling factors. """ def __init__(self, sensors): """ Arguments: sensors(list of :code:`parts.sensor.Sensor`): Sensors used in the observation for which to construct the observation error covariance matrix. """ self.sensors = sensors self.noise_scaling = {} def get_observation_error_covariance(self, *args, **kwargs): m = 0 stds = [] for s in self.sensors: if isinstance(s, ActiveSensor): if s.name in self.noise_scaling: c = self.noise_scaling[s.name] else: c = 1.0 stds += [c * s.nedt] for s in self.sensors: if isinstance(s, PassiveSensor): if s.name in self.noise_scaling: c = self.noise_scaling[s.name] else: c = 1.0 stds += [c * s.nedt] sig = np.concatenate(stds).ravel() return sp.sparse.diags(sig ** 2.0, format = "coo")
################################################################################ # Reduced retrieval grids ################################################################################
[docs]class ReducedVerticalGrid(APrioriProviderBase): def __init__(self, a_priori, grid, quantity = "pressure", covariance = None): if covariance is None: super().__init__(a_priori.name, a_priori) else: super().__init__(a_priori.name, covariance) self.a_priori = a_priori self.new_grid = grid self.quantity = quantity self._covariance = covariance retrieval_p_name = "get_" + a_priori.name + "_p_grid" self.__dict__[retrieval_p_name] = self.get_retrieval_p_grid def _get_grid(self, *args, **kwargs): f_name = "get_" + self.quantity try: grid = getattr(self.owner, f_name)(*args, **kwargs) except: raise Exception("Data provider does not provide get function " "for quantity {} required to determine original " "size of retrieval grid." .format(self.quantity)) return grid def _interpolate(self, y, *args, **kwargs): old_grid = self._get_grid(*args, **kwargs) if self.quantity == "pressure": f = sp.interpolate.interp1d(old_grid[::-1], y[::-1], axis = 0, bounds_error = False, fill_value = (y[-1], y[0])) yi = f(self.new_grid[::-1])[::-1] else: f = sp.interpolate.interp1d(old_grid, y, axis = 0, bounds_error = False, fill_value = (y[0], y[-1])) yi = f(self.new_grid) return yi def _interpolate_matrix(self, y, *args, **kwargs): old_grid = self._get_grid(*args, **kwargs) if self.quantity == "pressure": f = sp.interpolate.interp2d(old_grid[::-1], old_grid[::-1], y[::-1, ::-1], bounds_error = False) yi = f(self.new_grid[::-1], self.new_grid[::-1])[::-1, ::-1] else: f = sp.interpolate.interp2d(old_grid, old_grid, y, bounds_error = False) yi = f(self.new_grid, self.new_grid) return yi def get_xa(self, *args, **kwargs): self.a_priori.owner = self.owner xa = self.a_priori.get_xa(*args, **kwargs) return self._interpolate(xa, *args, **kwargs) def get_covariance(self, *args, **kwargs): if self._covariance is None: self.a_priori.owner = self.owner covmat = self.a_priori.get_covariance(*args, **kwargs) if isinstance(covmat, sp.sparse.spmatrix): covmat = covmat.todense() return self._interpolate_matrix(covmat, *args, **kwargs) else: return self._covariance.get_covariance(self.owner, *args, **kwargs) def get_precision(self, *args, **kwargs): if self._covariance is None: self.a_priori.owner = self.owner precmat = self.a_priori.get_precision(*args, **kwargs) if isinstance(precmat, sp.sparse.spmatrix): precmat = precmat.todense() return self._interpolate_matrix(precmat, *args, **kwargs) else: return self._covariance.get_precision(self.owner, *args **kwargs) def get_retrieval_p_grid(self, *args, **kwargs): if self.quantity == "pressure": return self.new_grid else: p = self.owner.get_pressure(*args, **kwargs) return self._interpolate(p, *args, **kwargs)