Source code for glofrim.glofrim_bmi

from collections import OrderedDict
import os
from os.path import join, isfile, abspath, dirname, basename, normpath
from datetime import datetime, timedelta
from configparser import ConfigParser
import logging
import numpy as np

from glofrim.utils import setlogger, closelogger, add_file_handler
from glofrim.gbmi import EBmi
import glofrim.glofrim_lib as glib
from glofrim.pcr_bmi import PCR
from glofrim.cmf_bmi import CMF
from glofrim.dfm_bmi import DFM
from glofrim.wfl_bmi import WFL
from glofrim.lfp_bmi import LFP
from glofrim.spatial_coupling import SpatialCoupling, groupby_sum

class Glofrim(EBmi):
    """Central CSDMS-compliant BMI implementation; 
    generic script providing function sceleton to other model-specified BMIs; 
    defining main and central model and BMI information
    
    Arguments:
        EBmi {class} -- Interface (abstract base class) for a model that implements the CSDMS BMI (Basic Model Interface).
    
    Raises:
        ValueError -- Raised if syntax requesting variable name is not correct
        AssertionError -- Raised if model dt is not a whole number of GLOFRIM dt
        Warning -- Warning is raised if two-step model initialization is not correctly executed
        ValueError -- Raised if config-files of models to be coupled are not defined in ini-file
        ValueError -- Raised if engines (ie executables) for models to be coupled are not specified in ini/env-file
        ValueError -- Raised if no exchanges are specified in GLOFRIM ini-file
        ValueError -- Raised if an unspported model is specified
        ValueError -- Raised if unknown variables are specified
        Warning -- Warning is raised if two-step model initialization is not correctly executed
        Warning -- Raised in models are not initialized before updating
        Exception -- Raised if model end time is already reached; no further updating possible
        Exception -- Raised if specified time is smaller than time step or later than model end time
        NotImplementedError -- SpinUp is not implemented for central BMI
        AssertionError -- Raised if no grid is set before
        AssertionError -- Raised if no 1D network is set before
    """

    _name = 'GLOFRIM'
    _version = '2.0'
    _models = {'PCR': PCR, 'CMF': CMF, 'DFM': DFM, 'WFL': WFL, 'LFP': LFP}
    _init_pre_exchange = ['DFM'] # need to initialize before we can know the grid

    def __init__(self, loglevel=logging.INFO):
        self.bmimodels = OrderedDict()
        self.exchanges = []
        self._var_sep = "."
        self._mult_sep = "*"
        self._ind_sep = "@"
        self._coord_sep = "|"

        self._loglevel = loglevel
        self.logger = setlogger(None, self._name, thelevel=loglevel)
        self.wb_logger = setlogger(None, 'wb', thelevel=loglevel, show_in_console=False)
        self.initialized = False
        self.obs = None

    def _check_long_var_name(self, long_var_name):
        #TODO: what's the use of this function? You provide a name and get it back split?
        """Returns variable name of a user-specified variable.
        
        Arguments:
            long_var_name {str} -- variable long name
        
        Raises:
            ValueError -- Raised if syntax requesting variable name is not correct
        
        Returns:
            str -- variable name of specified variable
        """

        if not (len(long_var_name.split(self._var_sep)) == 2):
            msg = 'use "model{}var" syntax for long_var_name'.format(self._var_sep)
            self.logger.error(msg)
            raise ValueError(msg)
        return long_var_name.split(self._var_sep)

    def _check_initialized(self):
        """Checks whether model was already initialized previously.
        criterion needed since some functions require the model to be in initialized state
        
        Returns:
            boolean -- Boolean value whether model is already initialized
        """

        self.initialized = np.all([self.bmimodels[mod].initialized for mod in self.bmimodels])
        return self.initialized

    def _check_dt(self):
        """Checks whether the dt specified for a model is not a whole number of GLOFRIM dt.
        criterion is needed to ensure updates between models remain in phase
        
        Raises:
            AssertionError -- Raised if model dt is not a whole number of GLOFRIM dt
        """

        for mod in self.bmimodels:
            dt_mod = self.bmimodels[mod].get_time_step()
            if not glib.check_dts_divmod(self._dt, dt_mod):
                msg = "Invalid value for dt in comparison to model dt. Make sure a whole number of model timesteps fit in the set GLOFRIM dt"  
                self.logger.error(msg)
                raise AssertionError(msg)

    ###
    ### Model Control Functions
    ###

[docs] def initialize_config(self, config_fn, env_fn=None): """Initializing the model configuration file. aligning GLOFRIM specifications for coupled runs to be consistent with overall run settings; with environment file (env-file) local paths to model engines can be defined Arguments: config_fn {str} -- path to model configuration file Keyword Arguments: env_fn {str} -- path to environment file (default: {None}) Raises: Warning -- Warning is raised if two-step model initialization is not correctly executed ValueError -- Raised if config-files of models to be coupled are not defined in ini-file ValueError -- Raised if engines (ie executables) for models to be coupled are not specified in ini/env-file """ # log to file add_file_handler(self.logger, abspath(config_fn).replace('.ini', '.log')) if self.initialized: msg = "model already initialized, it's therefore not longer possible to initialize the config" self.logger.warn(msg) raise Warning(msg) # config settings self._config_fn = abspath(config_fn) self._root = dirname(config_fn) self.logger.info('Reading ini file..') self._config = glib.configread(self._config_fn, encoding='utf-8', cf=ConfigParser(inline_comment_prefixes=('#'))) # environment file -> merge with config if given if env_fn is not None and isfile(abspath(env_fn)): env_fn = abspath(env_fn) self.logger.info('Reading env file..') env = glib.configread(env_fn, encoding='utf-8', cf=ConfigParser(inline_comment_prefixes=('#'))) for sect in env.sections(): if sect not in self._config.sections(): self._config.add_section(sect) for opt in env.options(sect): self._config.set(sect, opt, glib.getabspath(env.get(sect, opt), dirname(env_fn))) if self._config.has_option('models', 'root_dir'): self._root = glib.getabspath(self._config.get('models', 'root_dir'), self._root) self._config.remove_option('models', 'root_dir') ## parse glofrim config # initialize bmi component and it's configuration if not self._config.has_section('models'): msg = 'GLOFRIM ini misses a "models" section' self.logger.error(msg) raise ValueError(msg) for mod in self._config.options('models'): bmi_kwargs = {'logger': self.logger, 'loglevel': self._loglevel} _bmi = self._models[mod] # check if bmi component requires engine if 'engine' in _bmi.__init__.__code__.co_varnames: if not self._config.has_option('engines', mod): msg = 'GLOFRIM ini or environment file misses a "engines" section with {} option' self.logger.error(msg) raise ValueError(msg.format(mod)) engine_path = glib.getabspath(self._config.get('engines', mod), self._root) bmi_kwargs.update(engine = engine_path) # read proj string from config file if not self._config.has_option('coupling', mod): msg = 'GLOFRIM ini or environment file misses a "coupling" section {} option for projection string, assuming lat-lon' self.logger.error(msg) # raise ValueError(msg.format(mod)) crs = self._config.get('coupling', mod, fallback='+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs') # initialize bmi component self.bmimodels[mod] = _bmi(**bmi_kwargs) # initialize config of bmi component modconf = glib.getabspath(self._config.get('models', mod), self._root) self.bmimodels[mod].initialize_config(modconf) # initialize grid self.bmimodels[mod].get_grid() # if grid does not have a crs, it will be taken from the config file if self.bmimodels[mod].grid.crs is None: self.bmimodels[mod].grid.crs = crs # parse exchanges section self.exchanges = self.set_exchanges() # create logfile for exchange volumes add_file_handler(self.wb_logger, abspath(config_fn).replace('.ini', '.wb'), formatter=logging.Formatter("%(message)s")) self._wb_header = ['time'] for mod in self.bmimodels: if hasattr(self.bmimodels[mod], '_get_tot_volume_in'): self._wb_header.append('{}_tot_in'.format(mod)) if hasattr(self.bmimodels[mod], '_get_tot_volume_out'): self._wb_header.append('{}_tot_out'.format(mod)) self._wb_header += [item[1]['name'] for item in self.exchanges if item[0] == 'exchange'] self.wb_logger.info(', '.join(self._wb_header)) # combined model time self._dt = timedelta(seconds=int(self._config.get('coupling' ,'dt', fallback=86400))) # set start_time & end_time if given if self._config.has_option('coupling', 'start_time'): start_time = datetime.strptime(self._config.get('coupling', 'start_time'), "%Y-%m-%d") self.set_start_time(start_time) if self._config.has_option('coupling', 'end_time'): end_time = datetime.strptime(self._config.get('coupling', 'end_time'), "%Y-%m-%d") self.set_end_time(end_time) self._timeunit = 'sec' self._startTime = self.get_start_time() self._endTime = self.get_end_time() self._t = self._startTime self.initialized=False # check model dt self._check_dt()
[docs] def set_exchanges(self): """Defines variable exchanges between models as well as unit and time conversions. exchanges can be defined in ini/env-file; it is important that all fluxes/states in ini/env-file are to m3 for mass balance. Raises: ValueError -- Raised if no exchanges are specified in GLOFRIM ini-file ValueError -- Raised if an unspported model is specified ValueError -- Raised if unknown variables are specified Returns: list -- list describing all models, states, and fluxes that will be exchanged """ self.logger.info('Parsing exchanges..') # parse exchanges if not self._config.has_section('exchanges'): msg = 'GLOFRIM ini misses a "exchanges" section' self.logger.error(msg) raise ValueError(msg) self.exchanges = [] model_called, vars_set = [], [] to_coords = None for ex_i, ex_from in enumerate(self._config.options('exchanges')): ex_to = self._config.get('exchanges', ex_from) # break up starting from back exdict = {} ## FROM # model if self._ind_sep in ex_from: ex_from, ind_from = ex_from.split(self._ind_sep) else: ind_from = 'grid' ex_from = ex_from.split(self._var_sep) mod_from, vars_from = ex_from[0], '.'.join(ex_from[1:]) if not (mod_from in self.bmimodels.keys()): msg = "unkown model name {}".format(mod_from) self.logger.error9(msg) raise ValueError(msg) exdict['from_mod'] = mod_from from_bmi = self.bmimodels[mod_from] # variables exdict['from_vars'], exdict['from_unit'] = [], [] vars_from = vars_from.split('*') from_model_vars = from_bmi._input_var_names + from_bmi._output_var_names for i, var_from in enumerate(vars_from): # last var may be a scalar multiplier if i > 0: # and i == (len(vars_from) - 1): try: var_from = float(var_from) except ValueError: pass exdict['from_vars'].append(var_from) # model variable exdict['from_unit'].append(from_bmi.get_var_units(var_from)) if isinstance(var_from, str) and var_from not in from_model_vars: self.logger.warning("Unkonwn variable {} for model {}".format(var_from, from_bmi._name)) ## TO # index if self._ind_sep in ex_to: ex_to, ind_to = ex_to.split(self._ind_sep) # check if manual set coordinates are found if self._coord_sep in ind_to: ind_to, to_coords = ind_to.split(self._coord_sep) to_coords = eval(to_coords) else: ind_to = 'grid' # model ex_to = ex_to.split(self._var_sep) mod_to, vars_to = ex_to[0], '.'.join(ex_to[1:]) if not (mod_to in self.bmimodels.keys()): msg = "unkown model name {}".format(mod_from) self.logger.error(msg) raise ValueError(msg) to_bmi = self.bmimodels[mod_to] exdict['to_mod'] = mod_to # variables exdict['to_vars'], exdict['to_unit'] = [], [] vars_to = vars_to.split('*') to_model_vars = to_bmi._input_var_names + to_bmi._output_var_names for i, var_to in enumerate(vars_to): # last var may be a scalar multiplier if i > 0: # and i == (len(vars_to) - 1): try: var_to = float(var_to) except ValueError: pass exdict['to_vars'].append(var_to) # model variable exdict['to_unit'].append(to_bmi.get_var_units(var_to)) if isinstance(var_to, str) and var_to not in to_model_vars: self.logger.Warning("Unkonwn variable {} for model {}".format(var_to, to_bmi._name)) # with second call to set variable add instead of replace if exdict['to_vars'][0] in vars_set: exdict['add'] = True else: vars_set.append(exdict['to_vars'][0]) ## SpatialCoupling # create SpatialCoupling object. Actual coupling happens later sc_kwargs = dict(to_bmi=to_bmi, from_bmi=from_bmi) if isfile(ind_to): sc_kwargs.update(filename=ind_to, method='from_file') else: coupling_method = '{}_2_{}'.format(ind_from, ind_to) sc_kwargs.update(method=coupling_method, to_coords=to_coords) exdict['coupling'] = SpatialCoupling(**sc_kwargs) exdict['name'] = '{:s}.{:s}_to_{:s}.{:s}'.format(mod_from, exdict['from_vars'][0], mod_to, exdict['to_vars'][0]) ## UPDATE # update model before first time it is called in mod_from if not mod_from in model_called: self.exchanges.append(('update', mod_from)) model_called.append(mod_from) self.exchanges.append(('exchange', exdict)) # add last model updates for mod in self.bmimodels.keys(): if not mod in model_called: self.exchanges.append(('update', mod)) model_called.append(mod) return self.exchanges
def _set_spatial_coupling(self): """Couples model grids. for one-way coupling, this is a one-to-many coupling, ie one origin cell is coupled to multiple destination cells """ for item in self.exchanges: if item[0] == 'exchange': self.logger.info('set coupled grids for: {}'.format(str(item[1]['coupling']))) item[1]['coupling'].couple()
[docs] def initialize_model(self, **kwargs): """Initializes the model, ie loading all files and checking for consistency. can only be executed after the model-specific config-file was initialized; essential for a successful two-step model initialization and aligned model data. Raises: Warning -- Warning is raised if two-step model initialization is not correctly executed """ if not hasattr(self, '_config_fn'): msg = 'run initialize_config before initialize_model' self.logger.warn(msg) raise Warning(msg) # some models (e.g. DFM) need to be intialized before we access it spatial information for mod in self.bmimodels: if mod in self._init_pre_exchange: self.bmimodels[mod].initialize_model() # set spatial coupling self._set_spatial_coupling() # initialize other models (CMF needs to be initialized after spatial coupling!) for mod in self.bmimodels: if mod not in self._init_pre_exchange: self.bmimodels[mod].initialize_model() self._check_initialized() # sync start and endtimes self._startTime = self.get_start_time() self._endTime = self.get_end_time() self._t = self._startTime # check model dt self._check_dt()
# TODO set observation points
[docs] def initialize(self, config_fn): """Initializes the model following a two-step initialization procedure. first, the config-file is initialized and where necessary aligned with overall model settings (e.g. output-dir, start and end time); second, the model is actually initialized. Arguments: config_fn {str} -- path to model configuration file """ self.initialize_config(config_fn) self.initialize_model()
[docs] def update(self, dt=None): """Updating model for a certain time step interval (default: None). checks whether model end time is already reached; requires that model-specific time step is a whole number of update time step. Keyword Arguments: dt {int} -- update time step interval [sec] at which information is exchanged between models (default: {None}) Raises: Warning -- Raised in models are not initialized before updating Exception -- Raised if model end time is already reached; no further updating possible """ if not self.initialized: msg = "models should be initialized first" self.logger.warn(msg) raise Warning(msg) if self._t >= self.get_end_time(): msg = "endTime already reached, model not updated" self.logger.warn(msg) raise Exception(msg) # update all models with combined model dt wb_dict = {'time': str(self._t)} dt = self._dt.total_seconds() if dt is None else dt t_next = self._t + timedelta(seconds=dt) for item in self.exchanges: if item[0] == 'update': # LFP deviates from the set timestep using an adaptive timestep if dt is set to large # calculate the dt to get to the next timestep # NOTE we use "update" instead of "update_until" to getter better logging. imod = item[1] dt_mod = (t_next - self.bmimodels[imod]._t).total_seconds() self.bmimodels[imod].update(dt=dt_mod) # get volume totals in and out if the bmi object has this funtion, ortherwise return -9999 tot_volume_in = getattr(self.bmimodels[imod], '_get_tot_volume_in', lambda: -9999.)() wb_dict.update({'{}_tot_in'.format(imod): '{:.2f}'.format(tot_volume_in)}) tot_volume_out = getattr(self.bmimodels[imod], '_get_tot_volume_out', lambda: -9999.)() wb_dict.update({'{}_tot_out'.format(imod): '{:.2f}'.format(tot_volume_out)}) elif item[0] == 'exchange': tot_volume = self.exchange(**item[1]) wb_dict.update({item[1]['name']: '{:.2f}'.format(tot_volume)}) # write water balance volumes to file according to header self.wb_logger.info(', '.join([wb_dict[c] for c in self._wb_header])) self._t = self.get_current_time()
[docs] def update_until(self, t, dt=None): """Updates the model until time t is reached with an update time step dt. Arguments: t {int} -- Model time until which the model is update Keyword Arguments: dt {int} -- update time step interval [s] at which information is exchanged between models (default: {None}) Raises: Exception -- Raised if specified time is smaller than time step or later than model end time """ if (t<self._t) or t>self.get_end_time(): msg = "wrong time input: smaller than model time or larger than endTime" self.logger.error(msg) raise Exception(msg) while self._t < t: self.update(dt=dt)
def spinup(self): """PCR-GLOBWB specific function (see pcr_bmi.py). Raises: NotImplementedError -- SpinUp is not implemented for central BMI """ raise NotImplementedError
[docs] def finalize(self): """Finalizes the model. shuts down all operations and closes output files. """ self.logger.info('finalize models bmi. Close loggers.') for mod in self.bmimodels: self.bmimodels[mod].finalize() closelogger(self.logger) closelogger(self.wb_logger)
[docs] def exchange(self, from_mod, to_mod, from_vars, to_vars, coupling, add=False, **kwargs): """Exchanges variable content from specified source variable in source model to a specified destination variable in the destination model. exchange is possible for entire grid or at specified indices; coupling either as totals or fractional values depending on exchange; source variable can either add to destination variable or overwrite it Arguments: from_mod {str} -- string defining the source model to_mod {str} -- string defining the destination model from_vars {str} -- string defining the source variable to_vars {str} -- string defining the destination variable coupling {SpatialCoupling} -- object defining the spatial coupling structure Keyword Arguments: add {bool} -- if True, source values are added to destination values; if False, overwritten (default: {False}) """ self.logger.info('{} {}.{} data to coupled {}.{} variable'.format('adding' if add else 'setting', from_mod, from_vars[0], to_mod, to_vars[0])) if coupling.at_indices(): tot_volume = self.exchange_at_indices(from_mod, to_mod, from_vars, to_vars, coupling, add=add, **kwargs) else: tot_volume = self.exchange_same_grid(from_mod, to_mod, from_vars, to_vars, coupling, add=add, **kwargs) return tot_volume
[docs] def exchange_same_grid(self, from_mod, to_mod, from_vars, to_vars, coupling, add=False, **kwargs): """Exchanges variable content from specified source variable in source model to a specified destination variable in the destination model for the entire grid. source variable can either add to destination variable or overwrite it Arguments: from_mod {str} -- string defining the source model to_mod {str} -- string defining the destination model from_vars {str} -- string defining the source variable to_vars {str} -- string defining the destination variable coupling {SpatialCoupling} -- object defining the spatial coupling structure Keyword Arguments: add {bool} -- if True, source values are added to destination values; if False, overwritten (default: {False}) """ # get FROM data & translate shape = self.bmimodels[from_mod].get_var_shape(from_vars[0]) vals = np.ones(shape) for from_var in from_vars: if isinstance(from_var, float): # scalar multiplier vals *= from_var else: vals *= self.bmimodels[from_mod].get_value(from_var) # reproject vals to the to_mod projection using the SpatialCoupling.reproject function if coupling.reproject is not None: vals = coupling.reproject(vals, np.nan) # total exchange self.logger.debug('total get {:3f}'.format(np.nansum(vals))) tot_volume = np.nansum(vals) # this is an ugly hack at the moment to circumvent the PCR.runoff [m] to CMF.roffin [m] coupling # TODO: solve this when we implement pint for unit conversion if (from_mod == 'PCR') and (from_vars[0] == 'runoff') and (to_mod == 'CMF') and (to_vars[0] == 'roffin'): tot_volume = np.nansum(vals * self.bmimodels[from_mod].get_value(self.bmimodels[from_mod]._area_var_name)) # get TO data & translate for var in to_vars[1:]: if isinstance(var, float): div = var else: div = self.bmimodels[to_mod].get_value(var) assert np.all(np.not_equal(div, 0)) # make sure not to divide by zero vals /= div # SET data if add: # add to current state vals += self.bmimodels[to_mod].get_value(to_vars[0]) self.bmimodels[to_mod].set_value(to_vars[0], vals) return tot_volume
[docs] def exchange_at_indices(self, from_mod, to_mod, from_vars, to_vars, coupling, add=False, **kwargs): """Exchanges variable content from specified source variable in source model to a specified destination variable in the destination model for user-specified indices. coupling either as totals or fractional values depending on exchange; source variable can either add to destination variable or overwrite it Arguments: from_mod {str} -- string defining the source model to_mod {str} -- string defining the destination model from_vars {str} -- string defining the source variable to_vars {str} -- string defining the destination variable coupling {SpatialCoupling} -- object defining the spatial coupling structure Keyword Arguments: add {bool} -- if True, source values are added to destination values; if False, overwritten (default: {False}) """ # get FROM data & translate vals_get = np.ones(coupling.from_ind.size) for from_var in from_vars: if isinstance(from_var, float): # scalar multiplier vals_get *= from_var else: vals_get *= self.bmimodels[from_mod].get_value_at_indices(from_var, coupling.from_ind) # TRANSFORM data from coupling.from_ind.size > coupling.to_ind.size if coupling.from_grp_n.max() > 1: # aggregate data from vals_get = groupby_sum(vals_get, coupling.from_grp) if coupling.to_grp_n.max() > 1: # split using fractions vals_set = np.repeat(vals_get, coupling.to_grp_n) * coupling.get_frac() else: vals_set = vals_get tot_volume = np.nansum(vals_get) vol_diff = tot_volume - np.nansum(vals_set) if vol_diff > 1E-2: self.logger.warming('Large difference in water volume from and to: {:.4f} m3'.format(vol_diff)) self.logger.debug('total get {:3f}'.format(tot_volume)) # get TO data & translate for var in to_vars[1:]: if isinstance(var, float): div = var else: div = self.bmimodels[to_mod].get_value_at_indices(var, coupling.to_ind) assert np.all(np.not_equal(div, 0)) # make sure not to divide by zero vals_set /= div # SET data if add: # add to current state vals_set += self.bmimodels[to_mod].get_value_at_indices(to_vars[0], coupling.to_ind) self.bmimodels[to_mod].set_value_at_indices(to_vars[0], coupling.to_ind, vals_set) return tot_volume
### ### Model Information Functions ###
[docs] def get_model_type(self): """Returns type of model. possible types are hydrologic, routing, or hydrodynamic model Returns: str -- type of specified model """ return {mod: self.bmimodels[mod].get_model_type() for mod in self.bmimodels}
[docs] def get_component_name(self): #TODO: what are model component in this context? """Returns component name of specified model. Returns: str -- name of specified component """ return {mod: self.bmimodels[mod].get_component_name() for mod in self.bmimodels}
[docs] def get_input_var_names(self): """Returns list with all possible input variable names that can be used as exchange TO the specified model. must be specified in model specific BMI class. Returns: list -- list of all possible input variables """ return {mod: self.bmimodels[mod].get_input_var_names() for mod in self.bmimodels}
[docs] def get_output_var_names(self): """Returns list with all possible output variable names that can be used as exchange FROM the specified model. must be specified in model specific BMI class. Returns: list -- list of all possible output variables """ return {mod: self.bmimodels[mod].get_output_var_names() for mod in self.bmimodels}
### ### Variable Information Functions ###
[docs] def get_var_type(self, long_var_name): """Returns the type of a user-specified model variable exposed via BMI. Arguments: long_var_name {str} -- long name of exposed model variable Returns: str -- type of specified variable """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].get_var_type(var)
[docs] def get_var_units(self, long_var_name): """Provides units of variable. Arguments: long_var_name {str} -- long name of exposed model variable Returns: str -- units of specified variable """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].get_var_units(var)
[docs] def get_var_rank(self, long_var_name): """Provides number of dimensions of variable. Arguments: long_var_name {str} -- long name of exposed model variable Returns: int -- rank of specified variable """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].get_var_rank(var)
[docs] def get_var_size(self, long_var_name): """Providestotal number of values contained in variable. Arguments: long_var_name {str} -- long name of exposed model variable Returns: int -- size of specified variable """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].get_var_size(var)
[docs] def get_var_shape(self, long_var_name): """Provides shape of variable. Arguments: long_var_name {str} -- long name of exposed model variable Returns: tuple -- shape of specified variable """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].get_var_shape(var)
[docs] def get_var_nbytes(self, long_var_name): """Provides number of bytes of variable. Arguments: long_var_name {str} -- long name of exposed model variable Returns: int -- byte number of specified variable """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].get_var_nbytes(var)
[docs] def get_start_time(self): """Provides start time of model. Returns: date -- start time of model """ self._startTime = max([self.bmimodels[mod].get_start_time() for mod in self.bmimodels]) return self._startTime
[docs] def get_current_time(self): """Provides current model time. Returns: date -- current model time of PCR-GLOBWB """ models_t = [self.bmimodels[mod]._t for mod in self.bmimodels] in_sync = not models_t or models_t.count(models_t[0]) == len(models_t) # check if identical if not in_sync: msg = '; '.join(['{}: {}'.format(m, str(t)) for m, t in zip(self.bmimodels, models_t)]) self.logger.warning("Model times out of sync: {}".format(msg)) self._t = models_t[0] return self._t
[docs] def get_end_time(self): """Provides end time of model. Returns: date -- end time of model """ self._endTime = min([self.bmimodels[mod].get_end_time() for mod in self.bmimodels]) return self._endTime
[docs] def get_time_step(self): """Provides time step of model. Returns: date -- time step of model """ return self._dt
[docs] def get_time_units(self): """Provides time unit of model. Returns: str -- time unit of model """ return self._timeunit
### ### Variable Getter and Setter Functions ###
[docs] def get_value(self, long_var_name, **kwargs): """Gets values of a certain exposed variable. entries with fill_value are replaced with NaN values Arguments: long_var_name {str} -- name of exposed model variable Returns: array -- array with values """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].get_value(var, **kwargs)
[docs] def get_value_at_indices(self, long_var_name, inds, **kwargs): """Gets values at a specific index of a certain exposed variable. entries with fill_value are replaced with NaN values Arguments: long_var_name {str} -- name of exposed model variable inds {int} -- Index pointing to entry within entire array of variable values Returns: float -- value at specific index of retrieved variable """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].get_value_at_indices(var, inds, **kwargs)
[docs] def set_value(self, long_var_name, src, **kwargs): """Overwriting of all values of a certain exposed variable with provided new values. entries with NaN value are replaced with fill_value; provided new values must match shape of aim variable Arguments: long_var_name {str} -- name of exposed model variable src {array} -- array with new values """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].set_value(var, src, **kwargs)
[docs] def set_value_at_indices(self, long_var_name, inds, src, **kwargs): """Overwriting of value at specific entry of a certain exposed variable with provided new values. entries with NaN value are replaced with fill_value Arguments: long_var_name {str} -- name of exposed model variable inds {int} -- Index pointing to entry within entire array of variable values src {array} -- array with new values Returns: [type] -- [description] """ mod, var = self._check_long_var_name(long_var_name) return self.bmimodels[mod].set_value_at_indices(var, inds, src, **kwargs)
### ### Grid Information Functions ###
[docs] def get_grid_type(self): """Provides grid type of model. can be regular, flexible or unit catchment grid Returns: str -- Grid type of model """ return {mod: self.bmimodels[mod].get_grid_type() for mod in self.bmimodels}
### ### Observation points ### def get_obs(self): """Under construction; function to get observation points to read out model output on the fly. """ pass def set_obs(self): """Under construction; function to set observation points to read out model output on the fly. """ pass def index(self, x, y, mod, in1d=False): """Finds the index in model data grid associated with coordinates in lat/lon projection of a user-specified point. Arguments: x {float} -- x-coordinate of point y {float} -- y-coordinate of point mod {str} -- model abbreviation Keyword Arguments: in1d {bool} -- True if point is on a 1D network in model (default: {False}) Raises: AssertionError -- Raised if no grid is set before AssertionError -- Raised if no 1D network is set before Returns: int -- index associated to x/y coordinates """ if self.bmimodels[mod].grid is None: msg = "{}: model grid not set".format(mod) self.logger.error(msg) raise AssertionError(msg) if in1d: if self.bmimodels[mod].grid._1d is None: msg = "{}: model 1d network not set".format(mod) self.logger.error(msg) raise AssertionError(msg) idx = self.bmimodels[mod].grid._1d.index(x,y) else: idx = self.bmimodels[mod].grid.index(x,y) return idx ### ### set and get attribute / config ###
[docs] def set_start_time(self, start_time): """Overwriting default model start time with user-specified start time. format of provided start time must be yyyy-mm-dd Arguments: start_time {date} -- user-specified start time """ for mod in self.bmimodels: self.bmimodels[mod].set_start_time(start_time) self.get_start_time() # sync start time self._t = self._startTime
[docs] def set_end_time(self, end_time): """Overwriting default model end time with user-specified end time. format of provided end time must be yyyy-mm-dd Arguments: end_time {date} -- user-specified end time """ for mod in self.bmimodels: self.bmimodels[mod].set_end_time(end_time) self.get_end_time() # sync end time
[docs] def get_attribute_names(self): """Provides list with all model attribute names from model config file. Returns: list -- list with model attribute names """ return {mod: self.bmimodels[mod].get_attribute_names() for mod in self.bmimodels}
[docs] def set_out_dir(self, out_dir): """Setting output directory of model. overwrites the default output directory Arguments: out_dir {str} -- path to output directory """ for mod in self.bmimodels: path = join(out_dir, mod) if not os.path.isdir(path): os.makedirs(path) self.bmimodels[mod].set_out_dir(path)
[docs] def get_attribute_value(self, attribute_name): """gets attribute value in in underlying model. attribute_name should use the following convention model_name.section_name:attribute_name Arguments: attribute_name {str} -- Name of BMI attribute Returns: float -- value of BMI attribute """ mod, attr = self._check_long_var_name(attribute_name) return self.bmimodels[mod].get_attribute_value(attr)
[docs] def set_attribute_value(self, attribute_name, attribute_value): """sets attribute value in underlying model. attribute_name should use the following convention model_name.section_name:attribute_name Arguments: attribute_name {str} -- Name of BMI attribute attribute_value {float} -- value to be set """ mod, attr = self._check_long_var_name(attribute_name) return self.bmimodels[mod].set_attribute_value(attr, attribute_value)