From 965178e0b817255969d332455ec21ec22c46999b Mon Sep 17 00:00:00 2001 From: Wouter Peters <wouter.peters@wur.nl> Date: Fri, 4 Feb 2011 16:01:48 +0000 Subject: [PATCH] making changes to run with TM5 trunk --- carbontracker.rc | 2 +- da.rc | 6 +- da/ct/obs.py | 6 +- da/tm5/observationoperator.py | 114 ++++++++++++++++++++++------------ da/tools/general.py | 5 ++ da/tools/pipeline.py | 13 ++-- das.py | 2 +- 7 files changed, 96 insertions(+), 52 deletions(-) diff --git a/carbontracker.rc b/carbontracker.rc index c8f1c45..d7b8c36 100644 --- a/carbontracker.rc +++ b/carbontracker.rc @@ -1,6 +1,6 @@ !!! Info for the CarbonTracker data assimilation system -datadir : /data/CO2/carbontracker/ct08/ +datadir : /Volumes/Storage/CO2/carbontracker/input/ct08/ obs.input.dir : ${datadir}/obsnc/with_fillvalue obs.input.fname : obs_forecast.nc ocn.covariance : ${datadir}/oif_p3_era40.dpco2.2000.01.hdf diff --git a/da.rc b/da.rc index 7514c19..7ec9a88 100644 --- a/da.rc +++ b/da.rc @@ -2,9 +2,9 @@ time.restart : False time.start : 2000-01-01 00:00:00 -time.finish : 2000-01-15 00:00:00 -time.cycle : 7 -time.nlag : 5 +time.finish : 2000-01-05 00:00:00 +time.cycle : 1 +time.nlag : 2 dir.da_run : ${HOME}/tmp/test_da forecast.nmembers : 32 diff --git a/da/ct/obs.py b/da/ct/obs.py index 08da80b..34d733c 100755 --- a/da/ct/obs.py +++ b/da/ct/obs.py @@ -114,9 +114,9 @@ class CtObservations(Observation): import Nio - ncf = Nio.open_file(filename,format='hdf') + ncf = Nio.open_file(filename,format='nc') ids = ncf.variables['id'].get_value() - simulated = ncf.variables['sampled_mole_frac'].get_value()*1e6 + simulated = ncf.variables['flask'].get_value() dummy = ncf.close() msg = "Successfully read data from model sample file (%s)"%filename ; logging.info(msg) @@ -132,7 +132,7 @@ class CtObservations(Observation): if id in obs_ids: index = obs_ids.index(id) - dummy = self.Data[index].simulated = val[0] + dummy = self.Data[index].simulated = val[:,0] else: diff --git a/da/tm5/observationoperator.py b/da/tm5/observationoperator.py index 328457a..a57e0f3 100755 --- a/da/tm5/observationoperator.py +++ b/da/tm5/observationoperator.py @@ -32,12 +32,6 @@ version = 'release 3.0' mpi_shell_filename = 'tm5_mpi_wrapper' mpi_shell_location = 'da/bin/' -needed_rc_items = [ - 'rundir', - 'outputdir', - 'time.start', - 'time.break.nday' - ] ################### Begin Class TM5 ################### @@ -68,7 +62,8 @@ class TM5ObservationOperator(ObservationOperator): self.Identifier = self.getid() # the identifier gives the model name self.Version = self.getversion() # the model version used self.RestartFileList = [] - self.outputdir = None # Needed for opening the samples.nc files created + self.RcFileType = 'None' + self.outputdir = None # Needed for opening the samples.nc files created dummy = self.LoadRc(RcFileName) # load the specified rc-file dummy = self.ValidateRc() # validate the contents @@ -101,25 +96,26 @@ class TM5ObservationOperator(ObservationOperator): # Create a link from TM5 to the rundirectory of the das system - sourcedir = self.tm_settings['rundir'] + sourcedir = self.tm_settings[self.rundirkey] targetdir = os.path.join(DaCycle['dir.exec'],'tm5') dummy = CreateLinks(sourcedir,targetdir) - self.outputdir = self.tm_settings['outputdir'] + self.outputdir = self.tm_settings[self.outputdirkey] DaCycle['dir.exec.tm5'] = targetdir # Write a modified TM5 model rc-file in which run/break times are defined by our da system NewItems = { - 'time.start' : DaCycle['time.sample.start'] , - 'time.final' : DaCycle['time.sample.end'] , - 'das.input.dir' : DaCycle['dir.input'] + self.timestartkey : DaCycle['time.sample.start'] , + self.timefinalkey : DaCycle['time.sample.end'] , + 'ct.params.input.dir' : DaCycle['dir.input'] , + 'output.flask.infile' : DaCycle['ObsOperator.inputfile'] } if DaCycle['time.restart']: # If this is a restart from a previous cycle, the TM5 model should do a restart - NewItems['istart'] = 3 + NewItems[self.istartkey] = 3 if DaCycle['time.sample.window'] != 0: # If this is a restart from a previous time step wihtin the filter lag, the TM5 model should do a restart - NewItems['istart'] = 3 + NewItems[self.istartkey] = 3 # If neither one is true, simply take the istart value from the tm5.rc file that was read @@ -146,7 +142,7 @@ class TM5ObservationOperator(ObservationOperator): """ This method loads a TM5 rc-file with settings for this simulation """ - import da.tools.rc as rc + import da.tools.rcn as rc self.tm_settings = rc.read(RcFileName) self.RcFileName = RcFileName @@ -168,6 +164,39 @@ class TM5ObservationOperator(ObservationOperator): """ from da.tools.general import ToDatetime + if self.RcFileType == 'pycasso': + + self.projectkey = 'my.project.dir' + self.rundirkey = 'my.run.dir' + self.outputdirkey = 'output.dir' + self.savedirkey = 'restart.write.dir' + self.timestartkey = 'timerange.start' + self.timefinalkey = 'timerange.end' + self.timelengthkey = 'jobstep.length' + self.istartkey = 'istart' + + else: + + self.projectkey = 'runid' + self.rundirkey = 'rundir' + self.outputdirkey = 'outputdir' + self.savedirkey = 'savedir' + self.timestartkey = 'time.start' + self.timefinalkey = 'time.final' + self.timelengthkey = 'time.break.nday' + self.istartkey = 'istart' + + needed_rc_items = [ + self.projectkey , + self.rundirkey , + self.outputdirkey , + self.savedirkey , + self.timestartkey , + self.timefinalkey , + self.timelengthkey, + self.istartkey + ] + for k,v in self.tm_settings.iteritems(): if v == 'True' : self.tm_settings[k] = True if v == 'False': self.tm_settings[k] = False @@ -176,6 +205,10 @@ class TM5ObservationOperator(ObservationOperator): self.tm_settings[k] = ToDatetime(v,fmt='TM5') if 'time.final' in k : self.tm_settings[k] = ToDatetime(v,fmt='TM5') + if 'timerange.start' in k : + self.tm_settings[k] = ToDatetime(v) + if 'timerange.end' in k : + self.tm_settings[k] = ToDatetime(v) for key in needed_rc_items: @@ -226,7 +259,7 @@ class TM5ObservationOperator(ObservationOperator): """ import da.tools.rc as rc - tm5rcfilename = os.path.join(self.tm_settings['rundir'],'tm5.rc') + tm5rcfilename = os.path.join(self.tm_settings[self.rundirkey],'tm5.rc') dummy = rc.write(tm5rcfilename,self.tm_settings) @@ -238,29 +271,29 @@ class TM5ObservationOperator(ObservationOperator): """ import da.tools.rc as rc - tm5rcfilename = os.path.join(self.tm_settings['rundir'],'tm5_runtime.rc') + tm5rcfilename = os.path.join(self.tm_settings[self.rundirkey],'tm5_runtime.rc') dummy = rc.write(tm5rcfilename,self.tm_settings) rc_runtm5={} - rc_runtm5['year1'] = self.tm_settings['time.start'].year - rc_runtm5['month1'] = self.tm_settings['time.start'].month - rc_runtm5['day1'] = self.tm_settings['time.start'].day - rc_runtm5['hour1'] = self.tm_settings['time.start'].hour + rc_runtm5['year1'] = self.tm_settings[self.timestartkey].year + rc_runtm5['month1'] = self.tm_settings[self.timestartkey].month + rc_runtm5['day1'] = self.tm_settings[self.timestartkey].day + rc_runtm5['hour1'] = self.tm_settings[self.timestartkey].hour rc_runtm5['minu1'] = 0 rc_runtm5['sec1'] = 0 - rc_runtm5['year2'] = self.tm_settings['time.final'].year - rc_runtm5['month2'] = self.tm_settings['time.final'].month - rc_runtm5['day2'] = self.tm_settings['time.final'].day - rc_runtm5['hour2'] = self.tm_settings['time.final'].hour + rc_runtm5['year2'] = self.tm_settings[self.timefinalkey].year + rc_runtm5['month2'] = self.tm_settings[self.timefinalkey].month + rc_runtm5['day2'] = self.tm_settings[self.timefinalkey].day + rc_runtm5['hour2'] = self.tm_settings[self.timefinalkey].hour rc_runtm5['minu2'] = 0 rc_runtm5['sec2'] = 0 - rc_runtm5['istart'] = self.tm_settings['istart'] - rc_runtm5['savedir'] = self.tm_settings['savedir'] - rc_runtm5['outputdir'] = self.tm_settings['outputdir'] + rc_runtm5[self.istartkey] = self.tm_settings[self.istartkey] + rc_runtm5[self.savedirkey] = self.tm_settings[self.savedirkey] + rc_runtm5[self.outputdirkey] = self.tm_settings[self.outputdirkey] rc.write(tm5rcfilename,rc_runtm5) @@ -272,7 +305,7 @@ class TM5ObservationOperator(ObservationOperator): Make sure that parameter files are written to the TM5 inputdir, and that observation lists are present """ - datadir = self.tm_settings['das.input.dir'] + datadir = self.tm_settings['ct.params.input.dir'] if not os.path.exists(datadir): msg = "The specified input directory for the TM5 model to read from does not exist (%s), exiting..."%datadir ; logging.error(msg) raise IOError,msg @@ -294,8 +327,13 @@ class TM5ObservationOperator(ObservationOperator): # Next, make sure there is an actual model version compiled and ready to execute - targetdir = os.path.join(self.tm_settings['rundir']) - self.Tm5Executable = os.path.join(targetdir,'tm5.x') + targetdir = os.path.join(self.tm_settings[self.rundirkey]) + + if self.RcFileType == 'pycasso': + self.Tm5Executable = os.path.join(targetdir,'tm5-'+self.tm_settings['my.project']+'-base.x') + else: + self.Tm5Executable = os.path.join(targetdir,'tm5.x') + if not os.path.exists(self.Tm5Executable): msg = "Required TM5 executable was not found %s"%self.Tm5Executable ; logging.error(msg) msg = "Please compile the model with the specified rc-file and the regular TM5 scripts first" ; logging.error(msg) @@ -320,7 +358,7 @@ class TM5ObservationOperator(ObservationOperator): # First get the save.hdf* data for TM5 from the current restart dir of the filter sourcedir = DaCycle['dir.restart.current'] - targetdir = self.tm_settings['savedir'] + targetdir = self.tm_settings[self.savedirkey] for file in os.listdir(sourcedir): @@ -396,7 +434,7 @@ class TM5ObservationOperator(ObservationOperator): DaPlatForm = DaCycle.DaPlatForm - targetdir = os.path.join(self.tm_settings['rundir']) + targetdir = os.path.join(self.tm_settings[self.rundirkey]) # Go to executable directory and start the subprocess, using a new logfile @@ -441,7 +479,7 @@ class TM5ObservationOperator(ObservationOperator): DaPlatForm = DaCycle.DaPlatForm - targetdir = os.path.join(self.tm_settings['rundir']) + targetdir = os.path.join(self.tm_settings[self.rundirkey]) # Go to executable directory and start the subprocess, using a new logfile @@ -463,7 +501,7 @@ class TM5ObservationOperator(ObservationOperator): template = DaPlatForm.GetJobTemplate(jobparams,block=True) template += 'cd %s\n'%targetdir - template += 'mpirun -np %d ./tm5.x\n'%(int(nprocesses),) + template += '/usr/local/intel/intel_11.1.0.84/mpich2-1.3.1/suite/bin/mpirun -np %d %s tm5.rc\n'%(int(nprocesses),self.Tm5Executable,) jobfile = DaPlatForm.WriteJob(DaCycle,template,'tm5') @@ -483,9 +521,9 @@ class TM5ObservationOperator(ObservationOperator): that is used by the DaCycle object to collect restart data for the filter. """ - sourcedir = os.path.join(self.tm_settings['outputdir']) - targetdir = os.path.join(self.tm_settings['savedir']) - filter = ['save_%s'%self.tm_settings['time.final'].strftime('%Y%m%d')] + sourcedir = os.path.join(self.tm_settings[self.outputdirkey]) + targetdir = os.path.join(self.tm_settings[self.savedirkey]) + filter = ['save_%s'%self.tm_settings[self.timefinalkey].strftime('%Y%m%d')] msg = "Performing a backup of TM5 save data" ; logging.debug(msg) msg = " from directory: %s " % sourcedir ; logging.debug(msg) diff --git a/da/tools/general.py b/da/tools/general.py index f77de6f..f97d24a 100755 --- a/da/tools/general.py +++ b/da/tools/general.py @@ -100,6 +100,11 @@ def ToDatetime(datestring,fmt=None): if fmt == 'TM5': datestring = '%04s-%02s-%02s %02s:%02s:00'%(datestring[0:4],datestring[4:6],datestring[6:8],datestring[8:10],datestring[10:12]) + elif fmt == 'pycasso-TM5': + pass # Format already compatible + else: + pass + try: return datetime.datetime.strptime(datestring,'%Y-%m-%d %H:%M:%S') diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index 605083f..200d3cb 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -217,7 +217,6 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag): DaCycle.RestartFileList.extend( ObservationOperator.RestartFileList ) - # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? dummy = Samples.AddModelDataMismatch(DaCycle) @@ -229,13 +228,15 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag): if lag == int(DaCycle['time.nlag'])-1 or DaCycle['time.restart']==False : - silent = False + base_date = "%s_%s"%(DaCycle['time.sample.start'].strftime('%Y%m%d%H'),DaCycle['time.sample.end'].strftime('%Y%m%d%H'),) + filename = os.path.join(ObservationOperator.outputdir,'flask_%s'%base_date) + dummy = Samples.AddSimulations(filename) + members = StateVector.EnsembleMembers[lag] - for Member in members: - filename = os.path.join(ObservationOperator.outputdir,'samples.%03d.nc'%Member.membernumber) - dummy = Samples.AddSimulations(filename,silent) + for n,Member in iterate(members): + Member.ModelSample = copy.deepcopy(Samples) - silent=True + Member.ModelSample.simulated = Member.ModelSample.simulated[n,:] StateVector.nobs += Samples.getlength() diff --git a/das.py b/das.py index 1ecef94..0ef2c39 100755 --- a/das.py +++ b/das.py @@ -32,7 +32,7 @@ from da.ct.optimizer import CtOptimizer PlatForm = MaunaloaPlatForm() DaSystem = CtDaSystem('carbontracker.rc') -ObsOperator = TM5ObservationOperator('/Users/peters/Modeling/TM5/ct_new.rc') +ObsOperator = TM5ObservationOperator('/Users/peters/Modeling/TM5/tm5-ctdas.rc') Samples = CtObservations() StateVector = CtStateVector() Optimizer = CtOptimizer() -- GitLab