From 6e4430072100f0973d2e64c9d045393105fd82e5 Mon Sep 17 00:00:00 2001 From: "ingrid.luijkx" <ingrid.luijkx@wur.nl> Date: Mon, 17 May 2021 10:26:52 +0200 Subject: [PATCH] moved sf6 project to archive folder --- da/sf6/__init__.py | 0 da/sf6/pipeline.py | 424 ------------------------------------------ da/sf6/statevector.py | 287 ---------------------------- 3 files changed, 711 deletions(-) delete mode 100644 da/sf6/__init__.py delete mode 100755 da/sf6/pipeline.py delete mode 100755 da/sf6/statevector.py diff --git a/da/sf6/__init__.py b/da/sf6/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/da/sf6/pipeline.py b/da/sf6/pipeline.py deleted file mode 100755 index 4767d299..00000000 --- a/da/sf6/pipeline.py +++ /dev/null @@ -1,424 +0,0 @@ -"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. -Users are recommended to contact the developers (wouter.peters@wur.nl) to receive -updates of the code. See also: http://www.carbontracker.eu. - -This program is free software: you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free Software Foundation, -version 3. This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - -You should have received a copy of the GNU General Public License along with this -program. If not, see <http://www.gnu.org/licenses/>.""" -#!/usr/bin/env python -# pipeline.py - -""" -.. module:: pipeline -.. moduleauthor:: Wouter Peters - -Revision History: -File created on 06 Sep 2010. - -The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system. - -""" -import logging -import os -import sys -import datetime -import copy - -header = '\n\n *************************************** ' -footer = ' *************************************** \n ' - -def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer): - """ The main point of entry for the pipeline """ - sys.path.append(os.getcwd()) - - logging.info(header + "Initializing current cycle" + footer) - start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) - - prepare_state(dacycle, statevector) - sample_state(dacycle, samples, statevector, obsoperator) - - invert(dacycle, statevector, optimizer) - - advance(dacycle, samples, statevector, obsoperator) - - save_and_submit(dacycle, statevector) - logging.info("Cycle finished...exiting pipeline") - -def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator): - """ The main point of entry for the pipeline """ - sys.path.append(os.getcwd()) - - logging.info(header + "Initializing current cycle" + footer) - start_job(dacycle, dasystem, platform, statevector, samples, obsoperator) - - if 'forward.savestate.dir' in dacycle: - fwddir = dacycle['forward.savestate.dir'] - else: - logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters") - fwddir = False - - if 'forward.savestate.legacy' in dacycle: - legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"]) - else: - legacy = False - logging.debug("No forward.savestate.legacy key found in rc-file") - - if not fwddir: - # Simply make a prior statevector using the normal method - - - prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel. - - else: - # Read prior information from another simulation into the statevector. - # This loads the results from another assimilation experiment into the current statevector - - if not legacy: - filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d')) - statevector.read_from_file(filename, 'prior') - else: - filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc') - statevector.read_from_legacy_file(filename, 'prior') - - - # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector - # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current - # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. - # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into - # the current format through the "write_to_file" method. - - savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) - statevector.write_to_file(savefilename, 'prior') - - # Now read optimized fluxes which we will actually use to propagate through the system - - if not fwddir: - - # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector - logging.info("Running forward with prior savestate from: %s"%savefilename) - - else: - - # Read posterior information from another simulation into the statevector. - # This loads the results from another assimilation experiment into the current statevector - - if not legacy: - statevector.read_from_file(filename, 'opt') - else: - statevector.read_from_legacy_file(filename, 'opt') - - logging.info("Running forward with optimized savestate from: %s"%filename) - - # Finally, we run forward with these parameters - - advance(dacycle, samples, statevector, obsoperator) - - # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list. - # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis. - - save_and_submit(dacycle, statevector) - - logging.info("Cycle finished...exiting pipeline") -#################################################################################################### - -def analysis_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator): - """ Main entry point for analysis of ctdas results """ - - from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data - from da.analysis.expand_molefractions import write_mole_fractions - from da.analysis.summarize_obs import summarize_obs - from da.analysis.time_avg_fluxes import time_avg - - logging.info(header + "Starting analysis" + footer) - - try: - save_weekly_avg_1x1_data(dacycle, statevector) - save_weekly_avg_state_data(dacycle, statevector) - except: - logging.error("Error in analysis of weekly 1x1 fluxes and/or statevector, continuing analysis...") - try: - save_weekly_avg_tc_data(dacycle, statevector) - save_weekly_avg_ext_tc_data(dacycle) - except: - logging.error("Error in analysis of weekly tc fluxes, continuing analysis...") - try: - save_weekly_avg_agg_data(dacycle,region_aggregate='transcom') - save_weekly_avg_agg_data(dacycle,region_aggregate='olson') - save_weekly_avg_agg_data(dacycle,region_aggregate='country') - except: - logging.error("Error in analysis of weekly aggregated fluxes, continuing analysis...") - try: - time_avg(dacycle,'flux1x1') - time_avg(dacycle,'transcom') - time_avg(dacycle,'olson') - time_avg(dacycle,'country') - except: - logging.error("Error in analysis of time averaged fluxes, continuing analysis...") - try: - write_mole_fractions(dacycle) - summarize_obs(dacycle) - except: - logging.error("Error in analysis of mole fractions, continuing analysis...") - -def archive_pipeline(dacycle, platform, dasystem): - """ Main entry point for archiving of output from one disk/system to another """ - - if 'task.rsync' not in dacycle: - logging.info('rsync task not found, not starting automatic backup...') - return - else: - logging.info('rsync task found, starting automatic backup...') - - for task in dacycle['task.rsync'].split(): - sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task] - destdir = dacycle['task.rsync.%s.destinationdir'%task] - - rsyncflags = dacycle['task.rsync.%s.flags'%task] - - # file ID and names - jobid = dacycle['time.end'].strftime('%Y%m%d') - targetdir = os.path.join(dacycle['dir.exec']) - jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) ) - logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) ) - # Template and commands for job - jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile} - - if platform.ID == 'cartesius': - jobparams['jobqueue'] = 'staging' - - template = platform.get_job_template(jobparams) - for sourcedir in sourcedirs.split(): - execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,) - template += execcommand - - # write and submit - platform.write_job(jobfile, template, jobid) - jobid = platform.submit_job(jobfile, joblog=logfile) - - -def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator): - """ Set up the job specific directory structure and create an expanded rc-file """ - - dasystem.validate() - dacycle.dasystem = dasystem - dacycle.daplatform = platform - dacycle.setup() - #statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc - #samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc - #obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc - obsoperator.setup(dacycle) # Setup Observation Operator - statevector.setup(dacycle) - -def prepare_state(dacycle, statevector): - """ Set up the input data for the forward model: obs and parameters/fluxes""" - - # We now have an empty statevector object that we need to populate with data. If this is a continuation from a previous cycle, we can read - # the previous statevector values from a NetCDF file in the restart directory. If this is the first cycle, we need to populate the statevector - # with new values for each week. After we have constructed the statevector, it will be propagated by one cycle length so it is ready to be used - # in the current cycle - - logging.info(header + "starting prepare_state" + footer) - - if not dacycle['time.restart']: - - # Fill each week from n=1 to n=nlag with a new ensemble - - for n in range(statevector.nlag): - date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle'])) - cov = statevector.get_covariance(date, dacycle) - statevector.make_new_ensemble(n, newmean=None, covariancematrix=cov) - - else: - - # Read the statevector data from file - - #saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc') - - - saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d')) - statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate - - # Now propagate the ensemble by one cycle to prepare for the current cycle - statevector.propagate(dacycle) - - # Finally, also write the statevector to a file so that we can always access the a-priori information - - current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) - statevector.write_to_file(current_sv, 'prior') # write prior info - -def sample_state(dacycle, samples, statevector, obsoperator): - """ Sample the filter state for the inversion """ - - - # Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step. - # This is especially important for: - # (i) The transport model restart data which holds the background mole fractions. This is needed to run the model one cycle forward - # (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed - - #status = dacycle.MoveSaveData(io_option='store',save_option='partial',filter=[]) - #msg = "All restart data have been copied to the save/tmp directory for future use" ; logging.debug(msg) - logging.info(header + "starting sample_state" + footer) - nlag = int(dacycle['time.nlag']) - logging.info("Sampling model will be run over %d cycles" % nlag) - - obsoperator.get_initial_data() - - for lag in range(nlag): - logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1)) - - ############# Perform the actual sampling loop ##################### - - sample_step(dacycle, samples, statevector, obsoperator, lag) - - logging.debug("statevector now carries %d samples" % statevector.nobs) - - -def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False): - """ Perform all actions needed to sample one cycle """ - - - # First set up the information for time start and time end of this sample - dacycle.set_sample_times(lag) - - startdate = dacycle['time.sample.start'] - enddate = dacycle['time.sample.end'] - dacycle['time.sample.window'] = lag - dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H")) - - logging.info("New simulation interval set : ") - logging.info(" start date : %s " % startdate.strftime('%F %H:%M')) - logging.info(" end date : %s " % enddate.strftime('%F %H:%M')) - logging.info(" file stamp: %s " % dacycle['time.sample.stamp']) - - - # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the - # type of info needed in your transport model - - statevector.write_members_to_file(lag, dacycle['dir.input']) - - samples.setup(dacycle) - samples.add_observations() - - # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? - - samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc']) - - sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp']) - samples.write_sample_coords(sampling_coords_file) - # Write filename to dacycle, and to output collection list - dacycle['ObsOperator.inputfile'] = sampling_coords_file - - # Run the observation operator - - obsoperator.run_forecast_model() - - - # Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting - # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. - - - # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates - # one file per member, some logic needs to be included to merge all files!!! - simulated_file = os.path.join(obsoperator.outputdir, 'flask_output.%s.nc' % dacycle['time.sample.stamp']) - samples.add_simulations(simulated_file) - - # Now write a small file that holds for each observation a number of values that we need in postprocessing - - #filename = samples.write_sample_coords() - - # Now add the observations that need to be assimilated to the statevector. - # Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag - # This is to make sure that the first step of the system uses all observations available, while the subsequent - # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only - # (and at least) once # in the assimilation - - - if not advance: - if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1: - statevector.obs_to_assimilate += (copy.deepcopy(samples),) - statevector.nobs += samples.getlength() - logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector") - - else: - statevector.obs_to_assimilate += (None,) - - -def invert(dacycle, statevector, optimizer): - """ Perform the inverse calculation """ - logging.info(header + "starting invert" + footer) - dims = (int(dacycle['time.nlag']), - int(dacycle['da.optimizer.nmembers']), - int(dacycle.dasystem['nparameters']), - statevector.nobs) - - if 'opt.algorithm' not in dacycle.dasystem: - logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") - logging.info("...using serial algorithm as default...") - optimizer.set_algorithm('Serial') - elif dacycle.dasystem['opt.algorithm'] == 'serial': - logging.info("Using the serial minimum least squares algorithm to solve ENKF equations") - optimizer.set_algorithm('Serial') - elif dacycle.dasystem['opt.algorithm'] == 'bulk': - logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations") - optimizer.set_algorithm('Bulk') - - optimizer.setup(dims) - optimizer.state_to_matrix(statevector) - - diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) - - optimizer.write_diagnostics(diagnostics_file, 'prior') - optimizer.set_localization(dacycle['da.system.localization']) - - if optimizer.algorithm == 'Serial': - optimizer.serial_minimum_least_squares() - else: - optimizer.bulk_minimum_least_squares() - - optimizer.matrix_to_state(statevector) - optimizer.write_diagnostics(diagnostics_file, 'optimized') - - - -def advance(dacycle, samples, statevector, obsoperator): - """ Advance the filter state to the next step """ - - # This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance) - - # Then, restore model state from the start of the filter - logging.info(header + "starting advance" + footer) - logging.info("Sampling model will be run over 1 cycle") - - obsoperator.get_initial_data() - - sample_step(dacycle, samples, statevector, obsoperator, 0, True) - - dacycle.restart_filelist.extend(obsoperator.restart_filelist) - dacycle.output_filelist.extend(obsoperator.output_filelist) - logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ") - - dacycle.output_filelist.append(dacycle['ObsOperator.inputfile']) - logging.debug("Appended Observation filename to dacycle for collection ") - - outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp']) - samples.write_sample_auxiliary(outfile) - - -def save_and_submit(dacycle, statevector): - """ Save the model state and submit the next job """ - logging.info(header + "starting save_and_submit" + footer) - - filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d')) - statevector.write_to_file(filename, 'opt') - - dacycle.output_filelist.append(filename) - dacycle.finalize() - - - - diff --git a/da/sf6/statevector.py b/da/sf6/statevector.py deleted file mode 100755 index 81b233d1..00000000 --- a/da/sf6/statevector.py +++ /dev/null @@ -1,287 +0,0 @@ -"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. -Users are recommended to contact the developers (wouter.peters@wur.nl) to receive -updates of the code. See also: http://www.carbontracker.eu. - -This program is free software: you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free Software Foundation, -version 3. This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - -You should have received a copy of the GNU General Public License along with this -program. If not, see <http://www.gnu.org/licenses/>.""" -#!/usr/bin/env python -# ct_statevector_tools.py - -""" -Author : peters - -Revision History: -File created on 28 Jul 2010. - -""" - -import os -import sys -sys.path.append(os.getcwd()) - -import logging -import numpy as np -from da.baseclasses.statevector import StateVector, EnsembleMember -import da.tools.io4 as io -from datetime import timedelta - -identifier = 'SF6 Statevector ' -version = '0.0' - -################### Begin Class SF6StateVector ################### - -class SF6StateVector(StateVector): - """ This is a StateVector object for CarbonTracker. It has a private method to make new ensemble members """ - - def get_covariance(self, date, dacycle): - """ Make a new ensemble from specified matrices, the attribute lag refers to the position in the state vector. - Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. - The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] - """ - try: - import matplotlib.pyplot as plt - except: - pass - - # Get the needed matrices from the specified covariance files - - fullcov = np.zeros((self.nparams, self.nparams), float) - - for n in range(self.nparams): - fullcov[n,n] = 0.5**2 - - return fullcov - - def setup(self, dacycle): - """ - Initialize the object by specifying the dimensions. - There are two major requirements for each statvector that you want to build: - - (1) is that the statevector can map itself onto a regular grid - (2) is that the statevector can map itself (mean+covariance) onto TransCom regions - - An example is given below. - """ - - self.nlag = int(dacycle['time.nlag']) - self.nmembers = int(dacycle['da.optimizer.nmembers']) - self.nparams = int(dacycle.dasystem['nparameters']) - self.nobs = 0 - - self.obs_to_assimilate = () # empty containter to hold observations to assimilate later on - - # These list objects hold the data for each time step of lag in the system. Note that the ensembles for each time step consist - # of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread. - - self.ensemble_members = list(range(self.nlag)) - - for n in range(self.nlag): - self.ensemble_members[n] = [] - - - # This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember - # that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid. - - mapfile = os.path.join(dacycle.dasystem['regionsfile']) - ncf = io.ct_read(mapfile, 'read') - self.tcmap = ncf.get_variable('transcom_regions') - ncf.close() - - self.gridmap = np.ones((180,360),'float') - - logging.debug("A TransCom map on 1x1 degree was read from file %s" % dacycle.dasystem['regionsfile']) - logging.debug("A parameter map on 1x1 degree was created") - - # Create a dictionary for state <-> gridded map conversions - - nparams = self.gridmap.max() - self.griddict = {} - for r in range(1, int(nparams) + 1): - sel = (self.gridmap.flat == r).nonzero() - if len(sel[0]) > 0: - self.griddict[r] = sel - - logging.debug("A dictionary to map grids to states and vice versa was created") - - # Create a matrix for state <-> TransCom conversions - - self.tcmatrix = np.zeros((self.nparams, 23), 'float') - - logging.debug("A matrix to map states to TransCom regions and vice versa was created") - - # Create a mask for species/unknowns - - self.make_species_mask() - - def make_species_mask(self): - - """ - - This method creates a dictionary with as key the name of a tracer, and as values an array of 0.0/1.0 values - specifying which StateVector elements are constrained by this tracer. This mask can be used in - the optimization to ensure that certain types of osbervations only update certain unknowns. - - An example would be that the tracer '14CO2' can be allowed to only map onto fossil fuel emissions in the state - - The form of the mask is: - - {'co2': np.ones(self.nparams), 'co2c14', np.zeros(self.nparams) } - - so that 'co2' maps onto all parameters, and 'co2c14' on none at all. These arrays are used in the Class - optimizer when state updates are actually performed - - """ - self.speciesdict = {'sf6': np.ones(self.nparams)} - logging.debug("A species mask was created, only the following species are recognized in this system:") - for k in list(self.speciesdict.keys()): - logging.debug(" -> %s" % k) - - def propagate(self,dummy): - """ - :rtype: None - - Propagate the parameter values in the StateVector to the next cycle. This means a shift by one cycle - step for all states that will - be optimized once more, and the creation of a new ensemble for the time step that just - comes in for the first time (step=nlag). - In the future, this routine can incorporate a formal propagation of the statevector. - - """ - - self.ensemble_members.append([]) - cov = self.get_covariance(None,None) - newmean = np.ones(self.nparams) - self.make_new_ensemble(self.nlag+1, newmean = newmean, covariancematrix=cov) - self.ensemble_members.pop(0) - - logging.info('The state vector remains the same in the SF6 run') - logging.info('The state vector has been propagated by one cycle') - - def make_new_ensemble(self, lag, newmean=None, covariancematrix=None): - """ - :param lag: an integer indicating the time step in the lag order - :param covariancematrix: a matrix to draw random values from - :rtype: None - - Make a new ensemble, the attribute lag refers to the position in the state vector. - Note that lag=1 means an index of 0 in python, hence the notation lag-1 in the indexing below. - The argument is thus referring to the lagged state vector as [1,2,3,4,5,..., nlag] - - The optional covariance object to be passed holds a matrix of dimensions [nparams, nparams] which is - used to draw ensemblemembers from. If this argument is not passed it will ne substituted with an - identity matrix of the same dimensions. - - """ - - if newmean == None: - newmean = np.ones(self.nparams) - - if covariancematrix == None: - covariancematrix = np.identity(self.nparams) - - # Make a cholesky decomposition of the covariance matrix - - - _, s, _ = np.linalg.svd(covariancematrix) - dof = np.sum(s) ** 2 / sum(s ** 2) - C = np.linalg.cholesky(covariancematrix) - - logging.debug('Cholesky decomposition has succeeded ') - logging.info('Appr. degrees of freedom in covariance matrix is %s' % (int(dof))) - - - # Create the first ensemble member with a deviation of 0.0 and add to list - - newmember = EnsembleMember(0) - newmember.param_values = newmean.flatten() # no deviations - self.ensemble_members[lag-1].append(newmember) - - # Create members 1:nmembers and add to EnsembleMembers list - - for member in range(1, self.nmembers): - rands = np.random.randn(self.nparams) - - newmember = EnsembleMember(member) - # routine to avoids that members < 0.0 - dev = np.dot(C, rands) #VDV - dummy = np.zeros(len(dev)) #VDV - for i in range(len(dev)): #VDV - if dev[i] < 0.0: #VDV - dummy[i] = newmean[i]*np.exp(dev[i]) #VDV - else: #VDV - dummy[i] = newmean[i]*(1+dev[i]) #VDV - newmember.param_values = dummy #VDV - #newmember.ParameterValues = np.dot(C, rands) + newmean - self.ensemble_members[lag-1].append(newmember) - - logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, lag)) - - def write_members_to_file(self, lag, outdir): - """ - :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag] - :rtype: None - - Write ensemble member information to a NetCDF file for later use. The standard output filename is - *parameters.DDD.nc* where *DDD* is the number of the ensemble member. Standard output file location - is the `dir.input` of the dacycle object. In principle the output file will have only two datasets inside - called `parametervalues` which is of dimensions `nparameters` and `parametermap` which is of dimensions (180,360). - This dataset can be read and used by a :class:`~da.baseclasses.observationoperator.ObservationOperator` object. - - .. note:: if more, or other information is needed to complete the sampling of the ObservationOperator you - can simply inherit from the StateVector baseclass and overwrite this write_members_to_file function. - - """ - - # These import statements caused a crash in netCDF4 on MacOSX. No problems on Jet though. Solution was - # to do the import already at the start of the module, not just in this method. - - #import da.tools.io as io - #import da.tools.io4 as io - - members = self.ensemble_members[lag] - - for mem in members: - filename = os.path.join(outdir, 'parameters.%03d.nc' % mem.membernumber) - ncf = io.CT_CDF(filename, method='create') - dimparams = ncf.add_params_dim(self.nparams) - dimgrid = ncf.add_latlon_dim() - logging.warning('Parameters for TM5 are NOT taken with exponential') - - # Explicitly set maximum allowable value to 0.0 - data = np.where(mem.param_values < 0, 0.0, mem.param_values) - - savedict = io.std_savedict.copy() - savedict['name'] = "parametervalues" - savedict['long_name'] = "parameter_values_for_member_%d" % mem.membernumber - savedict['units'] = "unitless" - savedict['dims'] = dimparams - savedict['values'] = data - savedict['comment'] = 'These are parameter values to use for member %d, note: they result from an exponential function' % mem.membernumber - ncf.add_data(savedict) - - griddata = self.vector2grid(vectordata=data) - - savedict = io.std_savedict.copy() - savedict['name'] = "parametermap" - savedict['long_name'] = "parametermap_for_member_%d" % mem.membernumber - savedict['units'] = "unitless" - savedict['dims'] = dimgrid - savedict['values'] = griddata.tolist() - savedict['comment'] = 'These are gridded parameter values to use for member %d, note: they result from an exponential function' % mem.membernumber - ncf.add_data(savedict) - - ncf.close() - - logging.debug('Successfully wrote data from ensemble member %d to file (%s) ' % (mem.membernumber, filename)) - - - - -################### End Class SF6StateVector ################### - -- GitLab