Commit 37e83b8f authored by Florentie, Liesbeth's avatar Florentie, Liesbeth
Browse files

added branch liesbeth/ctsf

parents
../../../../trunk/da/analysis
\ No newline at end of file
../../../../trunk/da/baseclasses
\ No newline at end of file
../../../../trunk/da/bin
\ No newline at end of file
../../../../trunk/da/carbondioxide
\ No newline at end of file
../../../../trunk/da/co2gridded
\ No newline at end of file
# """CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters.
# Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
# updates of the code. See also: http://www.carbontracker.eu.
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# version 3. This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>."""
#
# """
# Author : peters
#
# Revision History:
# File created on 09 Feb 2009.
# Major modifications to go to a class-based approach, July 2010.
#
# This module holds specific functions needed to use the TM5 model within the data assimilation shell. It uses the information
# from the DA system in combination with the generic tm5.rc files.
#
# The TM5 model is now controlled by a python subprocess. This subprocess consists of an MPI wrapper (written in C) that spawns
# a large number ( N= nmembers) of TM5 model instances under mpirun, and waits for them all to finish.
#
# The design of the system assumes that the tm5.x (executable) was pre-compiled with the normal TM5 tools, and is residing in a
# directory specified by the ${RUNDIR} of a tm5 rc-file. This tm5 rc-file name is taken from the data assimilation rc-file. Thus,
# this python shell does *not* compile the TM5 model for you!
#
# """
#
# from __future__ import division
# #!/usr/bin/env python
# # tm5_tools.py
"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters.
Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
updates of the code. See also: http://www.carbontracker.eu.
This program is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software Foundation,
version 3. This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this
program. If not, see <http://www.gnu.org/licenses/>."""
#!/usr/bin/env python
# model.py
from __future__ import division
import logging
import os
import sys
import shutil
import subprocess
import glob
import numpy as np
import datetime as dt
import da.tools.io4 as io
import da.tools.rc as rc
from string import join
from da.tools.general import create_dirs, to_datetime
from da.tm5.observationoperator import TM5ObservationOperator
sys.path.append(os.getcwd())
sys.path.append("../../")
identifier = 'ctsfTM5ObsOperator'
version = '1.0'
mpi_shell_filename = 'ctsf_tm5_mpi_wrapper'
mpi_shell_location = 'da/bin/'
################### Begin Class TM5 ###################
class ctsfTM5ObsOperator(TM5ObservationOperator):
""" This class holds methods and variables that are needed to run the TM5 model. It is initiated with as only argument a TM5 rc-file
location. This rc-file will be used to figure out the settings for the run.
*** This method of running TM5 assumes that a pre-compiled tm5.exe is present, and it will be run from time.start to time.final ***
These settings can be modified later. To run a model version, simply compile the model using an existing TM5 rc-file, then
open python, and type:
[]> tm=TM5('/Users/peters/Modeling/TM5/tutorial.rc')
[]> tm.write_rc()
[]> tm.WriteRunRc()
[]> tm.run()
To use this class inside a data assimilation cycle, a stand-alone method "setup()" is included which modifies the TM5
settings according to an external dictionary of values to overwrite, and then runs the TM5 model.
"""
def __init__(self, filename):
""" The instance of an TMObservationOperator is application dependent """
self.ID = identifier # the identifier gives the model name
self.version = version # the model version used
self.restart_filelist = []
self.output_filelist = []
self.outputdir = None # Needed for opening the samples.nc files created
self.load_rc(filename) # load the specified rc-file
self.validate_rc() # validate the contents
logging.info('Observation Operator initialized: %s (%s)' % (self.ID, self.version))
def run_forecast_model(self,fluxmodel):
# set timerange for flux calculation
startdate = self.dacycle['time.sample.start']
enddate = self.dacycle['time.sample.end']
logging.info('Calculating fluxes from %s to %s' % (startdate, enddate))
datevec = self.calc_timevec_monthly(startdate, enddate)
timevec = []
for date in datevec:
timevec.append( (date-fluxmodel.refdate).days )
del date
self.timevec = np.array(timevec)
# calculate and write fluxmap for ensemble
fluxmodel.calc_flux(self.timevec)
# TM5 run...
self.prepare_run()
self.validate_input() # perform checks
self.run() # calculate the NEE fit
self.save_data() # save NEE distribution to netCDF file
def calc_timevec_monthly(self, startdate, enddate):
"""Creates time vector with monthly interval ranging from startdate to enddate."""
timevec = []
date = dt.datetime(startdate.year,startdate.month,15)
while date <= enddate:
timevec.append(date)
if date.month < 12:
date = dt.datetime(date.year,date.month+1,15)
else:
date = dt.datetime(date.year+1,1,15)
logging.info('Timevec = %s' % timevec)
return timevec
def prepare_run(self):
"""
Prepare a forward model TM5 run, this consists of:
- reading the working copy TM5 rc-file,
- validating it,
- modifying the values,
- Removing the existing tm5.ok file if present
"""
# Write a modified TM5 model rc-file in which run/break times are defined by our da system
new_items = {
'submit.options': self.dacycle.daplatform.give_blocking_flag(),
self.timestartkey: self.dacycle['time.sample.start'],
self.timefinalkey: self.dacycle['time.sample.end'],
'jobstep.timerange.start': self.dacycle['time.sample.start'],
'jobstep.timerange.end': self.dacycle['time.sample.end'],
'jobstep.length': 'inf',
'ct.params.input.dir': self.dacycle['dir.input'],
'ct.params.input.file': os.path.join(self.dacycle['dir.input'], 'fluxmodel'),
'output.flask.infile': self.dacycle['ObsOperator.inputfile'] ,
'output.flask': 'True'
}
if self.dacycle['transition']:
new_items[self.istartkey] = self.transitionvalue
logging.debug('Resetting TM5 to perform transition of od meteo from 25 to 34 levels')
elif self.dacycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart
new_items[self.istartkey] = self.restartvalue
logging.debug('Resetting TM5 to perform restart')
else:
if not self.dacycle.has_key('da.obsoperator.restartfileinfirstcycle'):
new_items[self.istartkey] = self.coldstartvalue # if not, start TM5 'cold'
logging.debug('Resetting TM5 to perform cold start')
else:
new_items[self.istartkey] = self.restartvalue # If restart file is specified, start TM5 with initial restartfile
logging.debug('Resetting TM5 to start with restart file: %s'%self.dacycle['da.obsoperator.restartfileinfirstcycle'])
if self.dacycle['time.sample.window'] != 0: # If this is a restart from a previous time step within the filter lag, the TM5 model should do a restart
new_items[self.istartkey] = self.restartvalue
logging.debug('Resetting TM5 to perform restart')
# If neither one is true, simply take the istart value from the tm5.rc file that was read
self.modify_rc(new_items)
self.write_rc(self.rc_filename)
# Define the name of the file that will contain the modeled output of each observation
self.simulated_file = os.path.join(self.outputdir, 'flask_output.%s.nc' % self.dacycle['time.sample.stamp'])
def validate_input(self):
"""
Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present
"""
datadir = self.tm_settings['ct.params.input.dir']
logging.info('CT.PARAMS.INPUT.DIR = %s' %datadir)
if not os.path.exists(datadir):
msg = "The specified input directory for the TM5 model to read from does not exist (%s), exiting..." % datadir
logging.error(msg)
raise IOError, msg
datafiles = os.listdir(datadir)
obsfile = self.dacycle['ObsOperator.inputfile']
if not os.path.exists(obsfile):
msg = "The specified obs input file for the TM5 model to read from does not exist (%s), exiting..." % obsfile
logging.error(msg)
if not self.dacycle.has_key('forward.savestate.dir'):
raise IOError, msg
for n in range(int(self.dacycle['da.optimizer.nmembers'])):
paramfile = 'fluxmodel.%03d.nc' % n
if paramfile not in datafiles:
msg = "The specified parameter input file for the TM5 model to read from does not exist (%s), exiting..." % paramfile
logging.error(msg)
raise IOError, msg
# Next, make sure there is an actual model version compiled and ready to execute
targetdir = os.path.join(self.tm_settings[self.rundirkey])
if self.rcfiletype == 'pycasso':
self.tm5_exec = os.path.join(targetdir, self.tm_settings['my.basename'] + '.x')
else:
self.tm5_exec = os.path.join(targetdir, 'tm5.x')
if not os.path.exists(self.tm5_exec):
logging.error("Required TM5 executable was not found %s" % self.tm5_exec)
logging.error("Please compile the model with the specified rc-file and the regular TM5 scripts first")
raise IOError
################### End Class TM5 ###################
if __name__ == "__main__":
pass
"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters.
Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
updates of the code. See also: http://www.carbontracker.eu.
This program is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software Foundation,
version 3. This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this
program. If not, see <http://www.gnu.org/licenses/>."""
#!/usr/bin/env python
# da_initexit.py
"""
.. module:: initexit
.. moduleauthor:: Wouter Peters
Revision History:
File created on 13 May 2009.
The CycleControl class is found in the module :mod:`initexit`. It is derived from the standard python :class:`dictionary` object. It is the only core object of CTDAS that is automatically created in the pipeline, the user (normally) does not need to modify or extend it. The class is created based on options and arguments passes on the command line when submitting your main CTDAS job.
Valid options are defined in
.. autofunction:: da.tools.initexit.parse_options
With the name of a valid ``rc-file``, the CycleControl object is instantiated and validated. An example rc-file looks
like this:::
! Info on the data assimilation cycle
time.restart : False ! Restart from an existing run T/F
time.start : 2000-01-01 00:00:00 ! Start time of first cycle
time.finish : 2000-01-08 00:00:00 ! End time of last cycle
time.cycle : 7 ! length of each cycle, 7 means one week
time.nlag : 5 ! number of cycles in one smoother window
dir.da_run : ${HOME}/tmp/test_da ! the run directory for you project
! Info on the DA system used
da.system : CarbonTracker ! an identifier for your inversion system
da.system.rc : da/rc/carbontracker.rc ! the settings needed in your inversion system
! Info on the forward model to be used
da.obsoperator : TM5 ! an identifier for your observation operator
da.obsoperator.rc : ${HOME}/Modeling/TM5/tm5-ctdas.rc ! the rc-file needed to run youobservation operator
da.optimizer.nmembers : 30 ! the number of ensemble members desired in the optimization
The most important method of the CycleControl object are listed below:
.. autoclass:: da.tools.initexit.CycleControl
:members: setup, finalize, collect_restart_data, move_restart_data,
submit_next_cycle, setup_file_structure, recover_run, random_seed
Two important attributes of the CycleControl object are:
(1) DaSystem, an instance of a :ref:`dasystem`
(2) DaPlatForm, an instance of a :ref:`platform`
Other functions in the module initexit that are related to the control of a DA cycle are:
.. autofunction:: da.tools.initexit.start_logger
.. autofunction:: da.tools.initexit.validate_opts_args
"""
import logging
import os
import sys
import glob
import shutil
import copy
import getopt
import cPickle
import numpy as np
from string import join
import da.tools.rc as rc
from da.tools.general import create_dirs, to_datetime, advance_time
from da.tools.initexit import CycleControl
needed_da_items = [
'time.start',
'time.finish',
'time.nlag',
'time.cycle',
'dir.da_run',
'da.resources.ncycles_per_job',
'da.resources.ntasks',
'da.resources.ntime',
'da.system',
'da.system.rc',
'da.obsoperator',
'da.obsoperator.rc',
'da.optimizer.nmembers']
# only needed in an earlier implemented where each substep was a separate job
# validprocesses = ['start','done','samplestate','advance','invert']
class ctsfCycleControl(CycleControl):
def setup(self):
"""
This method determines how to proceed with the cycle. Three options are implemented:
1. *Fresh start* : set up the required file structure for this simulation and start
2. *Restart* : use latest da_runtime variables from the exec dir and restart
3. *Recover* : restart after crash by getting data from restart/one-ago folder
The choice that gets executed depends on the presence of
# the ``-r`` option on the command line, this triggers a recover
# the ``time.restart : True`` option in the da.rc file
The latter is automatically set if the filter submits the next cycle at the end of the current one,
through method :meth:`~da.tools.initexit.CycleControl.submit_next_cycle`.
The specific call tree under each scenario is:
1. *Fresh Start*
* :meth:`~da.tools.initexit.CycleControl.setup_file_structure()` <- Create directory tree
2. *Restart*
* :meth:`~da.tools.initexit.CycleControl.setup_file_structure()`
* :meth:`~da.tools.initexit.CycleControl.random_seed` <- Read the random seed from file
3. *Recover*
* :meth:`~da.tools.initexit.CycleControl.setup_file_structure()`
* :meth:`~da.tools.initexit.CycleControl.recover_run()` <- Recover files from restart/one-ago dir, reset ``time.start``
* :meth:`~da.tools.initexit.CycleControl.random_seed`
And is always followed by a call to
* parse_times()
* WriteRc('jobfilename')
"""
if self['transition']:
logging.info("Transition of filter from previous step with od meteo from 25 to 34 levels")
self.setup_file_structure()
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.read_random_seed(False)
elif self['time.restart']:
logging.info("Restarting filter from previous step")
self.setup_file_structure()
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.read_random_seed(False)
else: #assume that it is a fresh start, change this condition to more specific if crash recover added
logging.info("First time step in filter sequence")
self.setup_file_structure()
# expand jobrcfilename to include exec dir from now on.
# First strip current leading path from filename
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
shutil.copy(os.path.join(self.dasystem['regionsfile']),os.path.join(self['dir.analysis'],'copied_regions.nc'))
logging.info('Copied regions file to the analysis directory: %s'%os.path.join(self.dasystem['regionsfile']))
if self.dasystem.has_key('extendedregionsfile'):
shutil.copy(os.path.join(self.dasystem['extendedregionsfile']),os.path.join(self['dir.analysis'],'copied_regions_extended.nc'))
logging.info('Copied extended regions file to the analysis directory: %s'%os.path.join(self.dasystem['extendedregionsfile']))
else:
shutil.copy(os.path.join(self['dir.da_source'],'da','analysis','olson_extended.nc'),os.path.join(self['dir.analysis'],'copied_regions_extended.nc'))
logging.info('Copied extended regions from the da/analysis directory: %s'%os.path.join(self['dir.da_source'],'da','analysis','olson_extended.nc'))
for filename in glob.glob(os.path.join(self['dir.analysis'],'*.pickle')):
logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1])
os.remove(filename)
for filename in glob.glob(os.path.join(self['dir.exec'],'*.pickle')):
logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1])
os.remove(filename)
if self.dasystem.has_key('random.seed.init'):
self.read_random_seed(True)
self.parse_times()
"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters.
Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
updates of the code. See also: http://www.carbontracker.eu.
This program is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software Foundation,
version 3. This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this
program. If not, see <http://www.gnu.org/licenses/>."""
#!/usr/bin/env python
# model.py
from __future__ import division
import logging
import os
import numpy as np
import datetime as dt
import da.tools.io4 as io
from da.tools.general import to_datetime
from da.baseclasses.observationoperator import ObservationOperator
identifier = 'ctsfFluxModel'
version = '1.0'
################### Begin Class ObservationOperator ###################
class ctsfFluxModel(ObservationOperator):
"""
Testing
=======
This is a class that defines an ObervationOperator. This object is used to control the sampling of
a statevector in the ensemble Kalman filter framework. The methods of this class specify which (external) code
is called to perform the sampling, and which files should be read for input and are written for output.
The baseclasses consist mainly of empty methods that require an application specific application. The baseclass will take observed values, and perturb them with a random number chosen from the model-data mismatch distribution. This means no real operator will be at work, but random normally distributed residuals will come out of y-H(x) and thus the inverse model can proceed. This is mainly for testing the code...
"""
def __init__(self):
self.ID = identifier
self.version = version
logging.info('FluxModel object initialized: %s' % self.ID)
def setup(self,dacycle):
""" Perform all steps necessary to start the observation operator through a simple Run() call """
self.dacycle = dacycle
self.outputdir = dacycle['dir.output']
self.forecast_nmembers = int(dacycle['da.optimizer.nmembers'])
# fit settings
self.harmonicstype = dacycle.dasystem['harmonicstype'] # 'sincos' [default], or 'phase'
self.nhar = int(dacycle.dasystem['nharmonics']) # number of harmonical terms in baseline fit
if self.harmonicstype == 'phase':
self.nbasefitparams = 3 + self.nhar*3
self.scale_nonlin = dacycle.dasystem['scale_nonlinear_term'] # if False: no scaling of phase in harmonic terms (nonlinear)
else:
self.nbasefitparams = 3 + self.nhar*4
logging.info('Using default harmonicstype \'sincos\'')
self.ngamma = int(dacycle.dasystem['ngamma']) # number of sensitivity parameters per ecoregion: default = 12, 1 per calendar month
self.nfitparams = self.nbasefitparams + self.ngamma
self.nlat = 180 # 1x1 deg grid
self.nlon = 360 # 1x1 deg grid
self.assimilatenans = dacycle.dasystem['assimilate_missing_data'] # True: NaN values for SIF anomaly will be replace by 0, False: datapoint will be excluded from assimilation
self.anofile = dacycle.dasystem['fit.anomaly.inputfile']
self.anoname = dacycle.dasystem['fit.anomaly.name'] # name of anomaly in inputfile
self.unit_initialization = dacycle.dasystem['fit.unit_initialization'] # if True: initial fit will be constructed based on latitude, else: read from file
self.refdate = to_datetime(dacycle.dasystem['time.reference'])
writememberfluxes = self.dacycle.dasystem['writeMemberFluxes'] # write gridded fluxes for every member? [bool]
logging.info('Read parameters for flux calculation')
def validate_input(self):
""" Make sure that data needed for the FluxModel (such as anomaly inputdata) are present."""
if not os.path.exists(self.anofile):
msg = "Anomaly input file for CTSF inversion does not exist, exiting..."
logging.error(msg)
raise IOError, msg
def calc_flux(self, timevec, member=None, updateano=True, postprocessing=False, write=True):
""" Calculate gridded flux field and write to file, for one or all ensemble members."""
if updateano:
# check input
self.validate_input()
# read anomalies from file for defined timerange
self.ano = self.get_anomalies(timevec)
logging.info('Obtained anomalies for timerange.')
# loop over ensemble of statevectors. For each member, evaluate flux model
for em in range(self.forecast_nmembers):
if member is not None:
em = member
# read statevector, convert statevector to statevectormap...
# read gridded statevector member
if postprocessing:
filename = self.dacycle['dir.output'] +'/fluxmodel.' + str(em).zfill(3) + '.nc'
else:
filename = self.dacycle['dir.da_run'] + '/input/fluxmodel.' + str(em).zfill(3) + '.nc'
ncf = io.ct_read(filename, 'read')
statemap = ncf.get_variable('parametermap')
ncf.close()
# calculate flux for member
nee = self.evaluate_model(statemap, timevec)
logging.debug('Calculated fluxes for ensemble member %i' % em)
# write gridded flux to file
self.write_flux(nee, timevec, em, postprocessing)
if postprocessing or (member is not None):
break
del em
if not (member is None or write):
return nee
def write_flux(self, flux, timevec, em, postprocessing):
"""Write time and calculated fluxmap to fluxmodel.<member>.nc."""
# Create a file to hold simulated fluxes and parameters for each member
if postprocessing:
# create new file if necessary
fluxfile = os.path.join(self.outputdir, 'fluxmodel.%03d.nc' % em)
if os.path.exists(fluxfile):