Source code for parts.atmosphere.atmosphere

"""
The model atmosphere
====================

"""

import numpy as np
from parts.atmosphere.cloud_box import CloudBox
from parts.jacobian import  JacobianBase
from parts.retrieval import RetrievalBase, RetrievalQuantity

[docs]class TemperatureJacobian(JacobianBase): def __init__(self, quantity, index, p_grid = [], lat_grid = [], lon_grid = [], hse = "on"): super().__init__(quantity, index) self.p_grid = p_grid self.lat_grid = lat_grid self.lon_grid = lon_grid self.hse = hse def _make_setup_kwargs(self, ws): if self.p_grid.size == 0: g1 = ws.p_grid else: g1 = self.p_grid if self.lat_grid.size == 0: g2 = ws.lat_grid else: g2 = self.lat_grid if self.lon_grid.size == 0: g3 = ws.lon_grid else: g3 = self.lon_grid kwargs = {"g1" : g1, "g2" : g2, "g3" : g3, "hse" : self.hse} return kwargs
[docs] def setup(self, ws, data_provider, *args, **kwargs): kwargs = self._make_setup_kwargs(ws) ws.jacobianAddTemperature(**kwargs)
[docs]class TemperatureRetrieval(RetrievalBase, TemperatureJacobian): def __init__(self, quantity, index, p_grid = [], lat_grid = [], lon_grid = [], hse = "on"): RetrievalBase.__init__(self) TemperatureJacobian.__init__(self, quantity, index, p_grid, lat_grid, lon_grid, hse) def add(self, ws): ws.retrievalAddTemperature(**self._make_setup_kwargs(ws))
[docs]class Temperature(RetrievalQuantity): def __init__(self, atmosphere): super().__init__() self.atmosphere = atmosphere def get_data(self, ws, data_provider, *args, **kwargs): t = data_provider.get_temperature(*args, **kwargs) self.atmosphere.__check_dimensions__(t, "temperature") ws.t_field = self.atmosphere.__reshape__(t)
[docs] def set_from_x(self, ws, xa): x = self.transformation.invert(xa) self.t_field = x
@property def name(self): return "temperature" @property def jacobian_class(self): return TemperatureJacobian @property def retrieval_class(self): return TemperatureRetrieval
class Atmosphere: def __init__(self, dimensions, absorbers = [], scatterers = [], surface = None): self.__set_dimensions__(dimensions) self._required_data = [("p_grid", dimensions[:1], False), ("temperature", dimensions, False), ("altitude", dimensions, False), ("surface_altitude", dimensions[1:], True)] self.absorbers = absorbers self.scatterers = scatterers self.scattering = len(scatterers) > 0 self._dimensions = dimensions self._cloud_box = CloudBox(n_dimensions = len(dimensions), scattering = self.scattering) self._surface_data_indices = [] self._surface = surface self.temperature = Temperature(self) if not surface is None: nd = len(self._required_data) self._required_data += surface.required_data self.surface_data_indices = range(nd, len(self._required_data)) # # Dimensions # def __set_dimensions__(self, dimensions): if not type(dimensions) == tuple or not type(dimensions[0]) == int: raise Exception("Dimensions of atmosphere must be given as a tuple " "of integers.") if not len(dimensions) in [1, 2, 3]: raise Exception("The number of dimensions of the atmosphere " "must be 1, 2 or 3.") if not all([n >= 0 for n in dimensions]): raise Exception("The dimension tuple must contain only positive " "integers.") else: self._dimensions = dimensions @property def dimensions(self): return self._dimensions # # Absorbers # @property def absorbers(self): return self._absorbers @absorbers.setter def absorbers(self, absorbers): for a in absorbers: self.__dict__[a.name] = a self._required_data += [(a.name, self._dimensions, False)] self._absorbers = absorbers def add_absorber(self, absorber): self.__dict__[absorber.name] = absorber self._required_data += [(absorber.name, self._dimensions, False)] self._absorbers += absorber # # Cloud box # @property def cloud_box(self): return self._cloud_box # # Jacobian # def has_jacobian(self): for a in self.absorbers: if not a.jacobian is None: return True for b in self.scatterers: for m in b.moments: if not a.jacobian is None: return True # # Scatterers # @property def scatterers(self): return self._scatterers @scatterers.setter def scatterers(self, scatterers): for s in scatterers: self.__dict__[s.name] = s self._required_data += [(n, self._dimensions, False) \ for n in s.moment_names] self._scatterers = scatterers def add_scatterer(self, scatterer): self.__dict__[scatterer.name] = scatterer self._required_data += [(n, self._dimensions, False) \ for n in s.moment_names] self._absorbers += absorber # # Surface # @property def surface(self): return self._surface @surface.setter def set_surface(self, s): if not self._surface is None: rd = [d for i, d in enumerate(self._required_data) \ if i not in self._required_data_indices] nd = len(rd) rd += surface.required_data self._required_data = rd self._requried_data_indices = range(nd, len(rd)) self._surface = surface @property def required_data(self): return self._required_data # # Setup # def __setup_absorption__(self, ws): species = [] for i, a in enumerate(self._absorbers): a.setup(ws, i) species += [a.get_tag_string()] ws.abs_speciesSet(species = species) ws.abs_lines_per_speciesSetEmpty() ws.Copy(ws.abs_xsec_agenda, ws.abs_xsec_agenda__noCIA) ws.Copy(ws.propmat_clearsky_agenda, ws.propmat_clearsky_agenda__OnTheFly) def __setup_scattering__(self, ws): ws.ScatSpeciesInit() pb_names = [] for s in self._scatterers: s.setup(ws, len(pb_names)) pb_names += s.moment_names ws.particle_bulkprop_names = pb_names def setup(self, ws): if len(self.dimensions) == 1: ws.AtmosphereSet1D() if len(self.dimensions) == 2: ws.AtmosphereSet2D() if len(self.dimensions) == 3: ws.AtmosphereSet3D() self.__setup_absorption__(ws) self.__setup_scattering__(ws) self.surface.setup(ws) self.cloud_box.setup(ws) def setup_jacobian(self, ws): for a in self.absorbers: a.setup_jacobian(ws) for s in self.scatterers: for m in s.moments: m.setup_jacobian(ws) # # Data # def __check_dimensions__(self, f, name): s = f.shape err = "Provided atmospheric " + name + " field" err += " is inconsistent with the dimensions of the atmosphere." if len(s) != len(self.dimensions): raise Exception(err) if not all([i == j or j == 0 for i,j \ in zip(s, self.dimensions)]): raise Exception(err) def __reshape__(self, f): s = [1, 1, 1] j = 0 for i in range(len(self.dimensions)): if self.dimensions[0] > 0: s[i] = self.dimensions[i] else: s[i] = f.shape[i] return np.reshape(f, tuple(s)) def __get_pressure__(self, ws, provider, *args, **kwargs): p = provider.get_pressure(*args, **kwargs).ravel() if self.dimensions[0] != 0 and p.size != self.dimensions[0]: raise Exception("Provided pressure grid is inconsistent with" " dimensions of the atmosphere.") ws.p_grid = p def __get_altitude__(self, ws, provider, *args, **kwargs): dimensions = ws.t_field.value.shape z = provider.get_altitude(*args, **kwargs) self.__check_dimensions__(z, "altitude") z = self.__reshape__(z) if not z.shape == dimensions: raise Exception("Dimensions of altitude field inconsistent" " with dimensions of temperature field.") ws.z_field = z # Surface altitude dimensions = ws.t_field.value.shape if hasattr(provider, "get_surface_altitude"): zs = provider.get_surface_altitude() try: zs = zs.reshape(dimensions[1:]) ws.z_surface = zs except: raise Exception("Shape " + str(zs.shape) + "of provided " "surface altitude is inconsistent with " "the horizontal dimensions of the " "atmosphere " + str(dimensions) + ".") else: ws.z_surface = ws.z_field.value[0, :, :] def __get_absorbers__(self, ws, provider, *args, **kwargs): dimensions = ws.t_field.value.shape ws.vmr_field = np.zeros((len(self.absorbers),) + dimensions) for i, a in enumerate(self.absorbers): if a.retrieval is None: fname = "get_" + a.name f = provider.__getattribute__(fname) x = f(*args, **kwargs) self.__check_dimensions__(x, a.name) x = self.__reshape__(x) if not x.shape == dimensions: raise Exception("Dimensions of " + a.name + " VMR field " "inconcistent with dimensions of temperature " "field.") ws.vmr_field.value[i, :, :, :] = x i = 0 n_moments = sum([len(s.moment_names) for s in self.scatterers]) ws.particle_bulkprop_field = np.zeros(((n_moments,) + ws.t_field.value.shape)) def __get_scatterers__(self, ws, provider, *args, **kwargs): if not self.scatterers is None and len(self.scatterers) > 0: ws.cloudbox_on = 1 ws.cloudboxSetFullAtm() dimensions = ws.t_field.value.shape for s in self.scatterers: s.get_data(ws, provider, *args, **kwargs) def get_data(self, ws, provider, *args, **kwargs): self.__get_pressure__(ws, provider, *args, **kwargs) self.temperature.get_data(ws, provider, *args, **kwargs) self.__get_altitude__(ws, provider, *args, **kwargs) self.__get_absorbers__(ws, provider, *args, **kwargs) self.__get_scatterers__(ws, provider, *args, **kwargs) self.cloud_box.get_data(ws, provider, *args, **kwargs) self.surface.get_data(ws, provider, *args, **kwargs) # # Checks # def run_checks(self, ws): ws.atmgeom_checkedCalc() ws.atmfields_checkedCalc(bad_partition_functions_ok = 1) ws.propmat_clearsky_agenda_checkedCalc() ws.propmat_clearsky_agenda_checkedCalc() ws.abs_xsec_agenda_checkedCalc() self.cloud_box.run_checks(ws) self.surface.run_checks(ws) class Atmosphere1D(Atmosphere): def __init__(self, absorbers = [], scatterers = [], surface = None, levels = None): if levels is None: dimensions = (0,) else: if not type(levels) == int: raise Exception("The number of levels of the 1D atmosphere " "must be given by an integer.") else: dimensions = (level, ) super().__init__(dimensions, absorbers = absorbers, scatterers = scatterers, surface = surface)