diff --git a/da/baseclasses/dasystem.py b/da/baseclasses/dasystem.py index fcfe2ed7f4d1879035b78754f65d8937dbd47794..0f62431b0827393fd83ec055bef71daac7d0917f 100755 --- a/da/baseclasses/dasystem.py +++ b/da/baseclasses/dasystem.py @@ -74,8 +74,10 @@ class DaSystem(dict): needed_rc_items = {} for k, v in self.iteritems(): - if v == 'True' : self[k] = True - if v == 'False': self[k] = False + if v == 'True' : + self[k] = True + if v == 'False': + self[k] = False for key in needed_rc_items: if not self.has_key(key): diff --git a/da/baseclasses/optimizer.py b/da/baseclasses/optimizer.py index cc48099e2ed60ea70e2610f40afe5eae0331e5e1..c3548638f8599117de5f8a4332b6ab1375cb1425 100755 --- a/da/baseclasses/optimizer.py +++ b/da/baseclasses/optimizer.py @@ -14,6 +14,10 @@ import os import sys import logging import datetime +import numpy as np +import numpy.linalg as la + +import da.tools.io4 as io identifier = 'Optimizer baseclass' version = '0.0' @@ -43,7 +47,6 @@ class Optimizer(object): def create_matrices(self): """ Create Matrix space needed in optimization routine """ - import numpy as np # mean state [X] self.x = np.zeros((self.nlag * self.nparams,), float) @@ -63,7 +66,7 @@ class Optimizer(object): self.R = np.zeros((self.nobs,), float) self.HPHR = np.zeros((self.nobs,), float) else: - self.R = np.zeros((self.nobs,self.nobs,), float) + self.R = np.zeros((self.nobs, self.nobs,), float) self.HPHR = np.zeros((self.nobs, self.nobs,), float) # localization of obs self.may_localize = np.zeros(self.nobs, bool) @@ -83,8 +86,6 @@ class Optimizer(object): self.KG = np.zeros((self.nlag * self.nparams, self.nobs,), float) def state_to_matrix(self, StateVector): - import numpy as np - allsites = [] # collect all obs for n=1,..,nlag allobs = [] # collect all obs for n=1,..,nlag allmdm = [] # collect all mdm for n=1,..,nlag @@ -101,7 +102,7 @@ class Optimizer(object): self.x[n * self.nparams:(n + 1) * self.nparams] = members[0].ParameterValues self.X_prime[n * self.nparams:(n + 1) * self.nparams, :] = np.transpose(np.array([m.ParameterValues for m in members])) - if Samples != None: + if Samples != None: #LU jednoznaczne z if Samples self.rejection_threshold = Samples.rejection_threshold allreject.extend(Samples.getvalues('may_reject')) @@ -115,7 +116,7 @@ class Optimizer(object): simulatedensemble = Samples.getvalues('simulated') - if allsimulated == None : + if allsimulated == None : #LU if allsimulated: np.concatenate; else: np.array allsimulated = np.array(simulatedensemble) else: allsimulated = np.concatenate((allsimulated, np.array(simulatedensemble)), axis=0) @@ -139,7 +140,7 @@ class Optimizer(object): self.R[i] = mdm ** 2 else: for i, mdm in enumerate(allmdm): - self.R[i,i] = mdm ** 2 + self.R[i, i] = mdm ** 2 def matrix_to_state(self, StateVector): for n in range(self.nlag): @@ -164,8 +165,6 @@ class Optimizer(object): The type designation refers to the writing of prior or posterior data and is used in naming the variables" """ - import da.tools.io4 as io - #import da.tools.io as io outdir = DaCycle['dir.diagnostics'] filename = os.path.join(outdir, 'optimizer.%s.nc' % DaCycle['time.start'].strftime('%Y%m%d')) @@ -309,8 +308,6 @@ class Optimizer(object): def serial_minimum_least_squares(self): """ Make minimum least squares solution by looping over obs""" - import numpy as np - for n in range(self.nobs): # Screen for flagged observations (for instance site not found, or no sample written from model) @@ -336,10 +333,10 @@ class Optimizer(object): self.KG[:, n] = PHt / self.HPHR[n] if self.may_localize[n]: - logging.debug('Trying to localize observation %s, %i' % (self.sitecode[n],self.obs_ids[n])) + logging.debug('Trying to localize observation %s, %i' % (self.sitecode[n], self.obs_ids[n])) self.localize(n) else: - logging.debug('Not allowed to localize observation %s, %i' % (self.sitecode[n],self.obs_ids[n])) + logging.debug('Not allowed to localize observation %s, %i' % (self.sitecode[n], self.obs_ids[n])) alpha = np.double(1.0) / (np.double(1.0) + np.sqrt((self.R[n]) / self.HPHR[n])) @@ -366,8 +363,7 @@ class Optimizer(object): def bulk_minimum_least_squares(self): """ Make minimum least squares solution by solving matrix equations""" - import numpy as np - import numpy.linalg as la + # Create full solution, first calculate the mean of the posterior analysis @@ -411,8 +407,7 @@ class Optimizer(object): def localize(self, n): """ localize the Kalman Gain matrix """ - logging.debug('Not localized observation %s, %i' % (self.sitecode[n],self.obs_ids[n])) - pass + logging.debug('Not localized observation %s, %i' % (self.sitecode[n], self.obs_ids[n])) def set_algorithm(self): self.algorithm = 'Serial' @@ -429,7 +424,6 @@ if __name__ == "__main__": from da.tools.initexit import start_logger from da.tools.initexit import CycleControl from da.ct.obs import CtObservations - import numpy as np opts = ['-v'] args = {'rc':'../../da.rc', 'logfile':'da_initexit.log', 'jobrcfilename':'test.rc'} diff --git a/da/baseclasses/statevector.py b/da/baseclasses/statevector.py index a473c3d4542de27c850b81236d9f8195bf1c82e8..ae8e018c6ae939e56a4a7c86a00fce876fe318b9 100755 --- a/da/baseclasses/statevector.py +++ b/da/baseclasses/statevector.py @@ -282,7 +282,7 @@ class StateVector(object): logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, lag)) - def propagate(self): + def propagate(self, startdate, cycle): """ :rtype: None @@ -301,7 +301,7 @@ class StateVector(object): self.EnsembleMembers.append([]) # And now create a new time step of mean + members for n=nlag - date = self.DaCycle['time.start'] + timedelta(days=(self.nlag - 0.5) * int(self.DaCycle['time.cycle'])) + date = startdate + timedelta(days=(self.nlag - 0.5) * int(cycle)) cov = self.get_covariance(date) self.make_new_ensemble(self.nlag, cov) @@ -401,7 +401,7 @@ class StateVector(object): logging.info('Successfully read the State Vector from file (%s) ' % filename) - def write_members_to_file(self, lag): + def write_members_to_file(self, lag, outdir): """ :param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag] :rtype: None @@ -423,7 +423,6 @@ class StateVector(object): #import da.tools.io as io #import da.tools.io4 as io - outdir = self.DaCycle['dir.input'] members = self.EnsembleMembers[lag - 1] for mem in members: diff --git a/da/ct/obspack.py b/da/ct/obspack.py index 269f22f661b6479130fb949a2c022a5031937bb1..49648c36724be6ea38c91ea33164f276b6b782ee 100755 --- a/da/ct/obspack.py +++ b/da/ct/obspack.py @@ -318,7 +318,7 @@ class ObsPackObservations(Observation): species, site, method, lab, nr = os.path.split(obs.fromfile)[-1].split('_') - identifier = "%s_%02d_%s" % (site, int(lab), method,) + identifier = "%s_%02d_%s" % (site, int(lab), method) identifier = name_convert(name="%s_%s_%s" % (site.lower(), method.lower(), lab.lower(),), to='GV') diff --git a/da/ct/statevector.py b/da/ct/statevector.py index b093aee929b89c1e14dfadacfadca8427d4586ca..fbaf06d2314d1d338450fe9a26b577f73c994c2f 100755 --- a/da/ct/statevector.py +++ b/da/ct/statevector.py @@ -118,12 +118,12 @@ class CtStateVector(StateVector): for n in range(self.nlag): if qual == 'opt': - MeanState = f.get_variable('xac_%02d'%(n+1)) - EnsembleMembers = f.get_variable('adX_%02d'%(n+1)) + MeanState = f.get_variable('xac_%02d' % (n + 1)) + EnsembleMembers = f.get_variable('adX_%02d' % (n + 1)) elif qual == 'prior': - MeanState = f.get_variable('xpc_%02d'%(n+1)) - EnsembleMembers = f.get_variable('pdX_%02d'%(n+1)) + MeanState = f.get_variable('xpc_%02d' % (n + 1)) + EnsembleMembers = f.get_variable('pdX_%02d' % (n + 1)) if not self.EnsembleMembers[n] == []: self.EnsembleMembers[n] = [] diff --git a/da/examples/das.py b/da/examples/das.py index 90c73f70dbc9759395531b22187b8392e64ffbd7..243891475fa58317fe67a819d2ee54e8db3b9b14 100755 --- a/da/examples/das.py +++ b/da/examples/das.py @@ -7,82 +7,70 @@ import sys import os import logging -dummy = sys.path.append(os.getcwd()) +sys.path.append(os.getcwd()) ################################################################################################# # Next, import the tools needed to initialize a data assimilation cycle ################################################################################################# -from da.tools.initexit import start_logger -from da.tools.initexit import validate_opts_args -from da.tools.initexit import parse_options +from da.tools.initexit import start_logger, validate_opts_args, parse_options, CycleControl +from da.tools.pipeline import ensemble_smoother_pipeline +from da.platform.maunaloa import MaunaloaPlatForm +from da.ct.dasystem import CtDaSystem +from da.ct.statevector import CtStateVector +#from da.ct.obspack import ObsPackObservations +from da.ct.obs import CtObservations +from da.tm5.observationoperator import TM5ObservationOperator +from da.ct.optimizer import CtOptimizer + +from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data +from da.analysis.expand_mixingratios import write_mixing_ratios + ################################################################################################# # Parse and validate the command line options, start logging ################################################################################################# -dummy = start_logger() -opts, args = parse_options() -opts,args = validate_opts_args(opts,args) +start_logger() +opts, args = validate_opts_args(parse_options()) ################################################################################################# # Create the Cycle Control object for this job ################################################################################################# -from da.tools.initexit import CycleControl - -DaCycle = CycleControl(opts,args) +DaCycle = CycleControl(opts, args) -########################################################################################### -### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ## -########################################################################################### -from da.tools.pipeline import ensemble_smoother_pipeline -from da.platform.maunaloa import MaunaloaPlatForm -from da.ct.dasystem import CtDaSystem -from da.ct.statevector import CtStateVector -#from da.ct.obspack import ObsPackObservations -from da.ct.obs import CtObservations -from da.tm5.observationoperator import TM5ObservationOperator -from da.ct.optimizer import CtOptimizer - -PlatForm = MaunaloaPlatForm() -DaSystem = CtDaSystem(DaCycle['da.system.rc']) +PlatForm = MaunaloaPlatForm() +DaSystem = CtDaSystem(DaCycle['da.system.rc']) ObsOperator = TM5ObservationOperator(DaCycle['da.obsoperator.rc']) #Samples = ObsPackObservations() -Samples = CtObservations() +Samples = CtObservations() StateVector = CtStateVector() -Optimizer = CtOptimizer() +Optimizer = CtOptimizer() ########################################################################################## ################### ENTER THE PIPELINE WITH THE OBJECTS PASSED BY THE USER ############### ########################################################################################## -from da.tools.pipeline import header,footer +from da.tools.pipeline import header, footer -msg = header+"Entering Pipeline "+footer ; logging.info(msg) +logging.info(header + "Entering Pipeline " + footer) -ensemble_smoother_pipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer) +ensemble_smoother_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer) ########################################################################################## ################### All done, extra stuff can be added next, such as analysis ########################################################################################## -msg = header+"Starting analysis"+footer ; logging.info(msg) - - -from da.analysis.expand_fluxes import save_weekly_avg_1x1_data -from da.analysis.expand_fluxes import save_weekly_avg_state_data -from da.analysis.expand_fluxes import save_weekly_avg_tc_data -from da.analysis.expand_fluxes import save_weekly_avg_ext_tc_data -from da.analysis.expand_mixingratios import write_mixing_ratios +logging.info(header + "Starting analysis" + footer) -savedas = save_weekly_avg_1x1_data(DaCycle, StateVector) -savedas = save_weekly_avg_state_data(DaCycle, StateVector) -savedas = save_weekly_avg_tc_data(DaCycle, StateVector) -savedas = save_weekly_avg_ext_tc_data(DaCycle) -savedas = write_mixing_ratios(DaCycle) +save_weekly_avg_1x1_data(DaCycle, StateVector) +save_weekly_avg_state_data(DaCycle, StateVector) +save_weekly_avg_tc_data(DaCycle, StateVector) +save_weekly_avg_ext_tc_data(DaCycle) +write_mixing_ratios(DaCycle) sys.exit(0) diff --git a/da/examples/dasgridded.py b/da/examples/dasgridded.py index 67bbac35a61fbd5befa691badae198026eab7250..b333343da19a42a609c71f66d13b082534f2ca1b 100755 --- a/da/examples/dasgridded.py +++ b/da/examples/dasgridded.py @@ -7,80 +7,68 @@ import sys import os import logging -dummy = sys.path.append(os.getcwd()) + +sys.path.append(os.getcwd()) ################################################################################################# # Next, import the tools needed to initialize a data assimilation cycle ################################################################################################# -from da.tools.initexit import start_logger -from da.tools.initexit import validate_opts_args -from da.tools.initexit import parse_options +from da.tools.initexit import start_logger, parse_options, validate_opts_args, CycleControl + +from da.tools.pipeline import ensemble_smoother_pipeline +from da.platform.maunaloa import MaunaloaPlatForm +from da.ctgridded.dasystem import CtGriddedDaSystem +from da.ctgridded.statevector import CtGriddedStateVector +from da.ct.obs import CtObservations + +from da.tm5.observationoperator import TM5ObservationOperator +from da.ct.optimizer import CtOptimizer +from da.tools.pipeline import header, footer + +from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data +from da.analysis.expand_mixingratios import write_mixing_ratios ################################################################################################# # Parse and validate the command line options, start logging ################################################################################################# -dummy = start_logger() -opts, args = parse_options() -opts,args = validate_opts_args(opts,args) +start_logger() + +opts, args = validate_opts_args(parse_options()) ################################################################################################# # Create the Cycle Control object for this job ################################################################################################# -from da.tools.initexit import CycleControl - -DaCycle = CycleControl(opts,args) +DaCycle = CycleControl(opts, args) -########################################################################################### -### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ## -########################################################################################### - -from da.tools.pipeline import ensemble_smoother_pipeline -from da.platform.maunaloa import MaunaloaPlatForm -from da.ctgridded.dasystem import CtGriddedDaSystem -from da.ctgridded.statevector import CtGriddedStateVector -from da.ct.obs import CtObservations - -from da.tm5.observationoperator import TM5ObservationOperator -from da.ct.optimizer import CtOptimizer - -PlatForm = MaunaloaPlatForm() -DaSystem = CtGriddedDaSystem(DaCycle['da.system.rc']) +PlatForm = MaunaloaPlatForm() +DaSystem = CtGriddedDaSystem(DaCycle['da.system.rc']) ObsOperator = TM5ObservationOperator(DaCycle['da.obsoperator.rc']) -Samples = CtObservations() +Samples = CtObservations() StateVector = CtGriddedStateVector() -Optimizer = CtOptimizer() +Optimizer = CtOptimizer() ########################################################################################## ################### ENTER THE PIPELINE WITH THE OBJECTS PASSED BY THE USER ############### ########################################################################################## -from da.tools.pipeline import header,footer - -msg = header+"Entering Pipeline "+footer ; logging.info(msg) - -ensemble_smoother_pipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer) +logging.info(header + "Entering Pipeline " + footer) +ensemble_smoother_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer) ########################################################################################## ################### All done, extra stuff can be added next, such as analysis ########################################################################################## -msg = header+"Starting analysis"+footer ; logging.info(msg) - -from da.analysis.expand_fluxes import save_weekly_avg_1x1_data -from da.analysis.expand_fluxes import save_weekly_avg_state_data -from da.analysis.expand_fluxes import save_weekly_avg_tc_data -from da.analysis.expand_fluxes import save_weekly_avg_ext_tc_data -from da.analysis.expand_mixingratios import write_mixing_ratios +logging.info(header + "Starting analysis" + footer) -savedas = save_weekly_avg_1x1_data(DaCycle, StateVector) -savedas = save_weekly_avg_state_data(DaCycle, StateVector) -savedas = save_weekly_avg_tc_data(DaCycle, StateVector) -savedas = save_weekly_avg_ext_tc_data(DaCycle) -savedas = write_mixing_ratios(DaCycle) +save_weekly_avg_1x1_data(DaCycle, StateVector) +save_weekly_avg_state_data(DaCycle, StateVector) +save_weekly_avg_tc_data(DaCycle, StateVector) +save_weekly_avg_ext_tc_data(DaCycle) +write_mixing_ratios(DaCycle) sys.exit(0) diff --git a/da/examples/dasjet.py b/da/examples/dasjet.py index f61a7340e8dfcbd5c98410783f5b7cdedb4b1edb..bcfc25b7e840515a4dd7d6d41afd1910b18960f0 100755 --- a/da/examples/dasjet.py +++ b/da/examples/dasjet.py @@ -7,60 +7,51 @@ import sys import os import logging -dummy = sys.path.append(os.getcwd()) +sys.path.append(os.getcwd()) ################################################################################################# # Next, import the tools needed to initialize a data assimilation cycle ################################################################################################# -from da.tools.initexit import start_logger -from da.tools.initexit import validate_opts_args -from da.tools.initexit import parse_options +from da.tools.initexit import start_logger, validate_opts_args, parse_options, CycleControl +from da.tools.pipeline import ensemble_smoother_pipeline +from da.platform.jet import JetPlatForm +from da.ct.dasystem import CtDaSystem +from da.ct.statevector import CtStateVector +from da.ct.obs import CtObservations +from da.tm5.observationoperator import TM5ObservationOperator +from da.ct.optimizer import CtOptimizer +from da.tools.pipeline import header, footer + ################################################################################################# # Parse and validate the command line options, start logging ################################################################################################# -dummy = start_logger() -opts, args = parse_options() -opts,args = validate_opts_args(opts,args) +start_logger() +opts, args = validate_opts_args(parse_options()) ################################################################################################# # Create the Cycle Control object for this job ################################################################################################# -from da.tools.initexit import CycleControl - -DaCycle = CycleControl(opts,args) - -########################################################################################### -### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ## -########################################################################################### - -from da.tools.pipeline import ensemble_smoother_pipeline -from da.platform.jet import JetPlatForm -from da.ct.dasystem import CtDaSystem -from da.ct.statevector import CtStateVector -from da.ct.obs import CtObservations -from da.tm5.observationoperator import TM5ObservationOperator -from da.ct.optimizer import CtOptimizer +DaCycle = CycleControl(opts, args) -PlatForm = JetPlatForm() -DaSystem = CtDaSystem(DaCycle['da.system.rc']) +PlatForm = JetPlatForm() +DaSystem = CtDaSystem(DaCycle['da.system.rc']) ObsOperator = TM5ObservationOperator(DaCycle['da.obsoperator.rc']) -Samples = CtObservations() +Samples = CtObservations() StateVector = CtStateVector() -Optimizer = CtOptimizer() +Optimizer = CtOptimizer() ########################################################################################## ################### ENTER THE PIPELINE WITH THE OBJECTS PASSED BY THE USER ############### ########################################################################################## -from da.tools.pipeline import header,footer -msg = header+"Entering Pipeline "+footer ; logging.info(msg) +logging.info(header + "Entering Pipeline " + footer) -ensemble_smoother_pipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer) +ensemble_smoother_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer) ########################################################################################## diff --git a/da/platform/capegrim.py b/da/platform/capegrim.py index 666dbd021b1dbedffae31bd3e5999a825b880f5b..f8d952d507e925b7bb420dc24d46362629b1f3dd 100755 --- a/da/platform/capegrim.py +++ b/da/platform/capegrim.py @@ -23,11 +23,9 @@ class CapeGrimPlatForm(PlatForm): self.Version = '1.0' # the platform version used def give_blocking_flag(self): - return "" def give_queue_type(self): - return "foreground" def get_job_template(self,joboptions={},block=False): diff --git a/da/platform/huygens.py b/da/platform/huygens.py index c231f891a3f46c80e017a6a0262c04b74a62dae9..ba198c0e9162b3a7a69ba0f5abb94e2f57292ff9 100755 --- a/da/platform/huygens.py +++ b/da/platform/huygens.py @@ -9,22 +9,18 @@ File created on 06 Sep 2010. """ -import sys -import os import logging import subprocess -from da.baseclasses.platform import PlatForm, std_joboptions +from da.baseclasses.platform import PlatForm - -std_joboptions={'jobname':'test','jobaccount':'co2','jobtype':'serial','jobshell':'/bin/sh','depends':'','jobtime':'24:00:00','jobinput':'/dev/null','jobnode':'','jobtasks':'','modulenetcdf':'netcdf/4.1.2','networkMPI':''} +std_joboptions = {'jobname':'test', 'jobaccount':'co2', 'jobtype':'serial', 'jobshell':'/bin/sh', 'depends':'', 'jobtime':'24:00:00', 'jobinput':'/dev/null', 'jobnode':'', 'jobtasks':'', 'modulenetcdf':'netcdf/4.1.2', 'networkMPI':''} class HuygensPlatForm(PlatForm): def __init__(self): - - self.Identifier = 'huygens' # the identifier gives the platform name - self.Version = '1.0' # the platform version used + self.Identifier = 'huygens' # the identifier gives the platform name + self.Version = '1.0' # the platform version used def give_blocking_flag(self): @@ -46,13 +42,9 @@ class HuygensPlatForm(PlatForm): -on Jet/Zeus: return """ - return "queue" - - - - def get_job_template(self,joboptions={},block=False): + def get_job_template(self, joboptions={}, block=False): """ Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument. The job template should return the preamble of a job that can be submitted to a queue on your platform, @@ -86,46 +78,46 @@ class HuygensPlatForm(PlatForm): # """\n""" - template = """#!/bin/bash \n"""+\ - """## \n"""+ \ - """## This is a set of dummy names, to be replaced by values from the dictionary \n"""+ \ - """## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """+ \ - """## \n"""+ \ - """## @ node_usage = normal\n"""+\ - """jobnode \n"""+\ - """jobtasks \n"""+\ - """networkMPI \n"""+\ - """# @ notification = never\n"""+\ - """# @ input = jobinput\n"""+\ - """# @ output = logfile.$(jobid)\n"""+\ - """# @ error = logfile.$(jobid)\n"""+\ - """# @ wall_clock_limit = jobtime\n""" +\ - """# @ job_type = jobtype \n"""+\ - """# @ shell = /bin/bash\n"""+\ - """# @ queue \n"""+\ - """\n"""+\ - """module load ctdas\n"""+\ + template = """#!/bin/bash \n""" + \ + """## \n""" + \ + """## This is a set of dummy names, to be replaced by values from the dictionary \n""" + \ + """## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """ + \ + """## \n""" + \ + """## @ node_usage = normal\n""" + \ + """jobnode \n""" + \ + """jobtasks \n""" + \ + """networkMPI \n""" + \ + """# @ notification = never\n""" + \ + """# @ input = jobinput\n""" + \ + """# @ output = logfile.$(jobid)\n""" + \ + """# @ error = logfile.$(jobid)\n""" + \ + """# @ wall_clock_limit = jobtime\n""" + \ + """# @ job_type = jobtype \n""" + \ + """# @ shell = /bin/bash\n""" + \ + """# @ queue \n""" + \ + """\n""" + \ + """module load ctdas\n""" + \ """\n""" if 'depends' in joboptions: template += """#$ -hold_jid depends \n""" # First replace from passed dictionary - for k,v in joboptions.iteritems(): + for k, v in joboptions.iteritems(): while k in template: - template = template.replace(k,v) + template = template.replace(k, v) # Fill remaining values with std_options - for k,v in std_joboptions.iteritems(): + for k, v in std_joboptions.iteritems(): while k in template: - template = template.replace(k,v) + template = template.replace(k, v) return template - msg1 = 'Platform initialized: %s'%self.Identifier ; logging.info(msg1) + msg1 = 'Platform initialized: %s' % self.Identifier ; logging.info(msg1) # #msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg2) @@ -154,7 +146,7 @@ class HuygensPlatForm(PlatForm): - def submit_job(self,jobfile,joblog=None,block=False): + def submit_job(self, jobfile, joblog=None, block=False): """ This method submits a jobfile to the queue, and returns the queue ID """ @@ -162,19 +154,18 @@ class HuygensPlatForm(PlatForm): # msg = "A new task will be started (%s)"%cmd ; logging.info(msg) if block: - cmd = ["llsubmit","-s",jobfile] - msg = "A new task will be started (%s)"%cmd ; logging.info(msg) - output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) - print 'output',output - jobid = output.split()[3] - print 'jobid',jobid + cmd = ["llsubmit", "-s", jobfile] + msg = "A new task will be started (%s)" % cmd ; logging.info(msg) + output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) + print 'output', output + jobid = output.split()[3] + print 'jobid', jobid else: - cmd = ["llsubmit",jobfile] - msg = "A new task will be started (%s)"%cmd ; logging.info(msg) - output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) - jobid = output.split()[3] + cmd = ["llsubmit", jobfile] + msg = "A new task will be started (%s)" % cmd ; logging.info(msg) + output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) + jobid = output.split()[3] - return jobid diff --git a/da/platform/jet.py b/da/platform/jet.py index 2214e3a4829fb000e235298ae99e85e3ba28e079..b4380360c005a2806c828fb9558117dea98be24e 100755 --- a/da/platform/jet.py +++ b/da/platform/jet.py @@ -8,37 +8,35 @@ Revision History: File created on 06 Sep 2010. """ - -import sys import os import logging import subprocess from da.baseclasses.platform import PlatForm -std_joboptions={'jobname':'test','jobaccount':'co2','jobnodes':'nserial 1','jobshell':'/bin/sh','depends':'','jobtime':'00:30:00','joblog':os.getcwd()} +std_joboptions = {'jobname':'test', 'jobaccount':'co2', 'jobnodes':'nserial 1', 'jobshell':'/bin/sh', 'depends':'', 'jobtime':'00:30:00', 'joblog':os.getcwd()} class JetPlatForm(PlatForm): def __init__(self): - self.Identifier = 'NOAA jet' # the identifier gives the platform name - self.Version = '1.0' # the platform version used + self.Identifier = 'NOAA jet' # the identifier gives the platform name + self.Version = '1.0' # the platform version used - msg1 = '%s platform object initialized'%self.Identifier ; logging.debug(msg1) - msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.debug(msg2) + logging.debug('%s platform object initialized' % self.Identifier) + logging.debug('%s version: %s' % (self.Identifier, self.Version)) - def get_job_template(self,joboptions={},block=False): + def get_job_template(self, joboptions={}, block=False): """ Return the job template for a given computing system, and fill it with options from the dictionary provided as argument""" - template = """#$ -N jobname \n"""+ \ - """#$ -A jobaccount \n"""+ \ - """#$ -pe jobnodes \n"""+ \ - """#$ -l h_rt=jobtime \n"""+ \ - """#$ -S jobshell \n"""+ \ - """#$ -o joblog \n"""+ \ - """#$ -cwd\n"""+ \ - """#$ -r n\n"""+ \ - """#$ -V\n"""+ \ + template = """#$ -N jobname \n""" + \ + """#$ -A jobaccount \n""" + \ + """#$ -pe jobnodes \n""" + \ + """#$ -l h_rt=jobtime \n""" + \ + """#$ -S jobshell \n""" + \ + """#$ -o joblog \n""" + \ + """#$ -cwd\n""" + \ + """#$ -r n\n""" + \ + """#$ -V\n""" + \ """#$ -j y\n""" if 'depends' in joboptions: @@ -48,14 +46,14 @@ class JetPlatForm(PlatForm): template += """#$ -sync y\n""" # First replace from passed dictionary - for k,v in joboptions.iteritems(): + for k, v in joboptions.iteritems(): while k in template: - template = template.replace(k,v) + template = template.replace(k, v) # Fill remaining values with std_options - for k,v in std_joboptions.iteritems(): + for k, v in std_joboptions.iteritems(): while k in template: - template = template.replace(k,v) + template = template.replace(k, v) return template @@ -65,31 +63,30 @@ class JetPlatForm(PlatForm): except: return os.getpid() - def submit_job(self,jobfile,joblog=None,block=False): + def submit_job(self, jobfile, joblog=None, block=False): """ This method submits a jobfile to the queue, and returns the queue ID """ - cmd = ["qsub",jobfile] - msg = "A new task will be started (%s)"%cmd ; logging.info(msg) - output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) - jobid = output.split()[2] + cmd = ["qsub", jobfile] + logging.info("A new task will be started (%s)" % cmd) + output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] + logging.info(output) + jobid = output.split()[2] retcode = output.split()[-1] return retcode - def kill_job(self,jobid): + def kill_job(self, jobid): """ This method kills a running job """ - output = subprocess.Popen(['qdel',jobid], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) - + output = subprocess.Popen(['qdel', jobid], stdout=subprocess.PIPE).communicate()[0] + logging.info(output) return output - def job_stat(self,jobid): + def job_stat(self, jobid): """ This method gets the status of a running job """ - import subprocess #output = subprocess.Popen(['sgestat'], stdout=subprocess.PIPE).communicate()[0] ; logging.info(output) - return '' if __name__ == "__main__": diff --git a/da/platform/maunaloa.py b/da/platform/maunaloa.py index b2d1bba11de8a7ccf5c850726c7e9d2215efb0e2..260a9195b1f1480b35156b2dc3bc381655a7817d 100755 --- a/da/platform/maunaloa.py +++ b/da/platform/maunaloa.py @@ -9,28 +9,23 @@ File created on 06 Sep 2010. """ -import sys -import os import logging -import subprocess from da.baseclasses.platform import PlatForm, std_joboptions class MaunaloaPlatForm(PlatForm): def __init__(self): - - self.Identifier = 'WU maunaloa' # the identifier gives the platform name - self.Version = '1.0' # the platform version used + self.Identifier = 'WU maunaloa' # the identifier gives the platform name + self.Version = '1.0' # the platform version used def give_blocking_flag(self): - return "" def give_queue_type(self): return "foreground" - def get_job_template(self,joboptions={},block=False): + def get_job_template(self, joboptions={}, block=False): """ Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument. The job template should return the preamble of a job that can be submitted to a queue on your platform, @@ -48,38 +43,38 @@ class MaunaloaPlatForm(PlatForm): job until the submitted job in this template has been completed fully. """ - template = """## \n"""+ \ - """## This is a set of dummy names, to be replaced by values from the dictionary \n"""+ \ - """## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """+ \ - """## \n"""+ \ - """ \n"""+ \ - """#$ jobname \n"""+ \ - """#$ jobaccount \n"""+ \ - """#$ jobnodes \n"""+ \ - """#$ jobtime \n"""+ \ - """#$ jobshell \n"""+ \ - """\n"""+ \ - """source /usr/local/Modules/3.2.8/init/sh\n"""+ \ - """module load python\n"""+ \ + template = """## \n""" + \ + """## This is a set of dummy names, to be replaced by values from the dictionary \n""" + \ + """## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """ + \ + """## \n""" + \ + """ \n""" + \ + """#$ jobname \n""" + \ + """#$ jobaccount \n""" + \ + """#$ jobnodes \n""" + \ + """#$ jobtime \n""" + \ + """#$ jobshell \n""" + \ + """\n""" + \ + """source /usr/local/Modules/3.2.8/init/sh\n""" + \ + """module load python\n""" + \ """\n""" if 'depends' in joboptions: template += """#$ -hold_jid depends \n""" # First replace from passed dictionary - for k,v in joboptions.iteritems(): + for k, v in joboptions.iteritems(): while k in template: - template = template.replace(k,v) + template = template.replace(k, v) # Fill remaining values with std_options - for k,v in std_joboptions.iteritems(): + for k, v in std_joboptions.iteritems(): while k in template: - template = template.replace(k,v) + template = template.replace(k, v) return template - msg1 = 'Platform initialized: %s'%self.Identifier ; logging.info(msg1) + msg1 = 'Platform initialized: %s' % self.Identifier ; logging.info(msg1) #msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg2) diff --git a/da/tm5/observationoperator.py b/da/tm5/observationoperator.py index d475ba09fc9056a9f268f69dc5f9bed6ff04ec9e..2a27ee5310d0ce9a25c15b2a7eb3dbbb4ec85b59 100755 --- a/da/tm5/observationoperator.py +++ b/da/tm5/observationoperator.py @@ -24,6 +24,8 @@ import os import sys import logging import shutil +import datetime +import subprocess sys.path.append(os.getcwd()) sys.path.append("../../") @@ -450,7 +452,6 @@ class TM5ObservationOperator(ObservationOperator): def TM5_under_mpirun(self): """ Method handles the case where a shell runs an MPI process that forks into N TM5 model instances """ - import datetime DaPlatForm = self.DaCycle.DaPlatForm targetdir = os.path.join(self.tm_settings[self.rundirkey]) @@ -506,8 +507,7 @@ class TM5ObservationOperator(ObservationOperator): #LU nie iwem czy to dobrze zadziala z tym kodem bo moze jednak sa lepsze sposoby sprawdzenia statusu... def TM5_With_N_tracers(self): """ Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members""" - import datetime - import subprocess + tm5submitdir = os.path.join(self.tm_settings[self.rundirkey]) logging.info('tm5submitdir', tm5submitdir) diff --git a/da/tools/general.py b/da/tools/general.py index 35515fee63e93d7880891b536f9ac70b82745f4a..2d5363ea3cea4d7bdf7fb15a7c32d2f360e205bb 100755 --- a/da/tools/general.py +++ b/da/tools/general.py @@ -15,6 +15,7 @@ import logging import os import shutil import datetime +import re def validate_rc(rcfile, needed_items): """ validate the contents of an rc-file given a dictionary of required keys """ @@ -46,11 +47,9 @@ def create_dirs(dirname, forceclean=False): if not os.path.exists(dirname): os.makedirs(dirname) - msg = 'Creating new directory %s' % dirname - logging.info(msg) + logging.info('Creating new directory %s' % dirname) else: - msg = 'Using existing directory %s' % dirname - logging.debug(msg) + logging.debug('Using existing directory %s' % dirname) return dirname @@ -104,8 +103,6 @@ def name_convert(name=None, to=None): hun_tower-insitu_35 """ - import re - identifier = 'name_convert' if name == None or to == None: diff --git a/da/tools/initexit.py b/da/tools/initexit.py index 8255c172d4d8c45cc47772db47721b778801b7ef..7e77cb9180f35d0d5d37b2699426a2ae87f12ea6 100755 --- a/da/tools/initexit.py +++ b/da/tools/initexit.py @@ -58,6 +58,11 @@ import logging import os import sys import shutil +import copy +from string import join + +import da.tools.rc as rc +from da.tools.general import create_dirs needed_da_items = [ 'time.start', @@ -111,7 +116,6 @@ class CycleControl(dict): """ This method loads a DA Cycle rc-file with settings for this simulation """ - import da.tools.rc as rc rcdata = rc.read(RcFileName) for k, v in rcdata.iteritems(): @@ -198,7 +202,6 @@ class CycleControl(dict): Set the times over which a sampling interval will loop, depending on the lag. Note that lag falls in the interval [0,nlag-1] """ - import copy # Start from cycle times self['time.sample.start'] = copy.deepcopy(self['time.start']) @@ -361,7 +364,6 @@ class CycleControl(dict): and the resulting simulated values will be retrieved from there as well. """ - from da.tools.general import create_dirs # Create the run directory for this DA job, including I/O structure @@ -400,8 +402,7 @@ class CycleControl(dict): - replacing the output dir name, since it has the sample time in it... """ - import da.tools.rc as rc - from da.tools.general import create_dirs + # Replace rc-items with those from the crashed run's last rc-file (now in restart.current dir) @@ -456,8 +457,6 @@ class CycleControl(dict): """ - from da.tools.general import create_dirs - targetdir = os.path.join(self['dir.output']) create_dirs(os.path.join(targetdir)) @@ -499,7 +498,6 @@ class CycleControl(dict): reside in a separate folder, i.e, the ObservationOperator does *not* write directly to the CTDAS restart dir! """ - from da.tools.general import create_dirs targetdir = os.path.join(self['dir.restart.current']) @@ -532,8 +530,6 @@ class CycleControl(dict): In case of a 'store' command the restart.oneago folder is re-created so that the contents are empty to begin with. """ - from da.tools.general import create_dirs - if io_option not in ['store', 'restore']: raise ValueError, 'Invalid option specified for io_option (%s)' % io_option @@ -577,8 +573,8 @@ class CycleControl(dict): The resulting rc-file is written to the ``dir.exec`` so that it can be used when resubmitting the next cycle """ - import da.tools.rc as rc - import copy + + # We make a copy of the current DaCycle object, and modify the start + end dates and restart value @@ -602,10 +598,9 @@ class CycleControl(dict): def write_rc(self, fname): """ Write RC file after each process to reflect updated info """ - import da.tools.rc as rc rc.write(fname, self) - logging.debug('Wrote expanded rc-file (%s)' % (fname)) + logging.debug('Wrote expanded rc-file (%s)' % fname) def submit_next_cycle(self): @@ -618,7 +613,7 @@ class CycleControl(dict): If the end of the cycle series is reached, no new job is submitted. """ - from string import join + if self['time.end'] < self['time.finish']: @@ -632,7 +627,7 @@ class CycleControl(dict): jobparams = {'jobname':"j.%s" % jobid, 'jobtime':'06:00:00', 'logfile': logfile, 'errfile': logfile} template = self.DaPlatForm.get_job_template(jobparams) execcommand = os.path.join(self['dir.da_submit'], sys.argv[0]) - template += 'python %s rc=%s %s >&%s' % (execcommand, self['da.restart.fname'], join(self.opts, ''),logfile) + template += 'python %s rc=%s %s >&%s' % (execcommand, self['da.restart.fname'], join(self.opts, ''), logfile) # write and submit self.DaPlatForm.write_job(jobfile, template, jobid) diff --git a/da/tools/io.py b/da/tools/io.py deleted file mode 100755 index 7ee973240e46ab6cf7b36deb31648ba4052b850d..0000000000000000000000000000000000000000 --- a/da/tools/io.py +++ /dev/null @@ -1,206 +0,0 @@ -#!/usr/bin/env python -# io.py - -""" -Author : peters - -Revision History: -File created on 15 Oct 2008. -File modified for CT data assimilation system in July 2010, Wouter Peters - -""" -import standardvariables -import pycdf as CDF -import datetime as dt -from numpy import array -import os - -disclaimer = "This data belongs to the CarbonTracker project" -email = "wouter.peters@wur.nl" -url = "http://carbontracker.wur.nl" -institution = "Wageningen University and Research Center" -source = "CarbonTracker release 2.0" -conventions = "CF-1.1" -historytext = 'Created on ' + dt.datetime.now().strftime('%B %d, %Y') + ' by %s' % os.environ['USER'] - -std_savedict = {'name':'unknown', 'values':[], 'dims':(0, 0,), 'units':'', 'long_name':'', '_FillValue':float(-999.), 'comment':''} - -class CT_CDF(CDF.CDF): - """ function opens a NetCDF/HDF/GRIB file for writing of output""" - - def __init__(self, filename, method='read'): - if method not in ['read', 'write', 'create']: - raise ValueError, 'Method %s is not defined for a CarbonTracker NetCDF file object' % method - if method == 'read': - print 'Reading from file' - super(CDF.CDF, self).__init__(filename, CDF.NC.NOWRITE) - elif method == 'write': - #print 'Adding to existing file' - super(CT_CDF, self).__init__(filename, CDF.NC.WRITE | CDF.NC.CREATE) - self.add_ct_header() - elif method == 'create': - #print 'Creating new file' - super(CT_CDF, self).__init__(filename, CDF.NC.WRITE | CDF.NC.TRUNC | CDF.NC.CREATE) - self.add_ct_header() - - - def add_ct_header(self): - self.automode() - # - setattr(self, 'Institution', institution) - setattr(self, 'Contact', email) - setattr(self, 'URL', url) - setattr(self, 'Source', source) - setattr(self, 'Convention', conventions) - setattr(self, 'Disclaimer', disclaimer) - setattr(self, 'History', historytext) - - def add_params_dim(self, nparams): - self.automode() - dimparams = self.def_dim('nparameters', nparams) - return (dimparams,) - - def add_members_dim(self, nmembers): - self.automode() - dimmembers = self.def_dim('nmembers', nmembers) - return (dimmembers,) - - def add_lag_dim(self, nlag, unlimited=True): - self.automode() - if unlimited: - dimlag = self.def_dim('nlag', CDF.NC.UNLIMITED) - else: - dimlag = self.def_dim('nlag', nlag) - return (dimlag,) - - def add_obs_dim(self, nobs): - self.automode() - dimobs = self.def_dim('nobs', nobs) - return (dimobs,) - - def add_latlon_dim(self, istart=0, iend=360, jstart=0, jend=180): - from numpy import arange, float64 - - if 'latitude' in self.dimensions(): - return (self.dim('latitude'), self.dim('longitude')) # already exists - - lons = -180 + arange(360) * 1.0 + 0.5 - lats = -90 + arange(180) * 1.0 + 0.5 - # - lats = lats[jstart:jend] - lons = lons[istart:iend] - # - self.automode() - dimlon = self.def_dim('longitude', lons.shape[0]) - dimlat = self.def_dim('latitude', lats.shape[0]) - - savedict = self.standard_var(varname='latitude') - savedict['values'] = lats.tolist() - savedict['actual_range'] = (float(lats[0]), float(lats[-1])) - savedict['dims'] = (dimlat,) - self.add_data(savedict) - - savedict = self.standard_var(varname='longitude') - savedict['values'] = lons.tolist() - savedict['actual_range'] = (float(lons[0]), float(lons[-1])) - savedict['dims'] = (dimlon,) - self.add_data(savedict) - - return (dimlat, dimlon) - - def add_date_dim(self): - self.automode() - if 'date' in self.dimensions(): - return (self.dim('date'),) - return (self.def_dim('date', CDF.NC.UNLIMITED),) - - def add_date_dim_format(self): - self.automode() - if 'yyyymmddhhmmss' in self.dimensions(): - return (self.dim('yyyymmddhhmmss'),) # already exists - return (self.def_dim('yyyymmddhhmmss', 6),) - - def add_dim(self, dimname, dimsize): - if dimname not in self.dimensions(): - newdim = self.def_dim(dimname, dimsize) - return (newdim,) - - def has_date(self, dd): - if self.inq_unlimlen() > 0: - if dd in self.get_variable('date').tolist(): - return True - else: - return False - else: - return False - - def get_variable(self, varname): - """ get variable from ncf file""" - return array(self.var(varname).get()) - - def standard_var(self, varname): - """ return properties of standard variables """ - import standardvariables - - if varname in standardvariables.standard_variables.keys(): - return standardvariables.standard_variables[varname] - else: - return standardvariables.standard_variables['unknown'] - - def add_data(self, datadict, nsets=1, silent=True): - """ add fields to file, at end of unlimited dimension""" - - existing_vars = self.variables() - try: - next = datadict['count'] - except: - next = 0 - - if existing_vars.has_key(datadict['name']): - var = self.var(datadict['name']) - var[next:next + nsets] = datadict['values'] - else: - if not silent: - print 'Creating new dataset: ' + datadict['name'] - if datadict.has_key('dtype'): - if datadict['dtype'] == 'int': - var = self.def_var(datadict['name'], CDF.NC.INT, datadict['dims']) - elif datadict['dtype'] == 'char': - var = self.def_var(datadict['name'], CDF.NC.CHAR, datadict['dims']) - elif datadict['dtype'] == 'double': - var = self.def_var(datadict['name'], CDF.NC.DOUBLE, datadict['dims']) - else: - var = self.def_var(datadict['name'], CDF.NC.FLOAT, datadict['dims']) - else: - var = self.def_var(datadict['name'], CDF.NC.FLOAT, datadict['dims']) - for k, v in datadict.iteritems(): - if k not in ['name', 'dims', 'values', '_FillValue', 'count']: - setattr(var, k, v) - if var.isrecord(): - var[next:next + nsets] = datadict['values'] - else: - var[:] = datadict['values'] - -def get_variable(file, varname): - """ get variable from HDF file""" - return array(file.select(varname).get()) - -def create_dirs(rundat, dirname): - dirname = os.path.join(rundat.outputdir, dirname) - if not os.path.exists(dirname): - print "Creating new output directory " + dirname - os.makedirs(dirname) - else: - print 'Writing files to directory: %s' % (dirname,) - return dirname - - -if __name__ == '__main__': - try: - os.remove('test.nc') - except: - pass - ncf = CT_CDF('test.nc', 'create') - dimgrid = ncf.add_latlon_dim() - dimdate = ncf.add_date_dim() - dimidate = ncf.add_date_dim_format() diff --git a/da/tools/io_cdf.py b/da/tools/io_cdf.py deleted file mode 100755 index bd93aa3aef27b4d921b344a6b34747814c8cd1f5..0000000000000000000000000000000000000000 --- a/da/tools/io_cdf.py +++ /dev/null @@ -1,203 +0,0 @@ -#!/usr/bin/env python -# io.py - -""" -Author : peters - -Revision History: -File created on 15 Oct 2008. -File modified for CT data assimilation system in July 2010, Wouter Peters - -""" -import standardvariables -import pycdf as CDF -import datetime as dt -from numpy import array -import os - -disclaimer = "This data belongs to the CarbonTracker project" -email = "wouter.peters@wur.nl" -url = "http://carbontracker.wur.nl" -institution = "Wageningen University and Research Center" -source = "CarbonTracker release 2.0" -conventions = "CF-1.1" -historytext = 'Created on ' + dt.datetime.now().strftime('%B %d, %Y') + ' by %s' % os.environ['USER'] - -std_savedict = {'name':'unknown', 'values':[], 'dims':(0, 0,), 'units':'', 'long_name':'', '_FillValue':float(-999.), 'comment':''} - -class CT_CDF(CDF.CDF): - """ function opens a NetCDF/HDF/GRIB file for writing of output""" - - def __init__(self, filename, method='read'): - if method not in ['read', 'write', 'create']: - raise ValueError, 'Method %s is not defined for a CarbonTracker NetCDF file object' % method - if method == 'read': - print 'Reading from file' - super(CDF.CDF, self).__init__(filename, CDF.NC.NOWRITE) - elif method == 'write': - #print 'Adding to existing file' - super(CT_CDF, self).__init__(filename, CDF.NC.WRITE | CDF.NC.CREATE) - self.add_ct_header() - elif method == 'create': - #print 'Creating new file' - super(CT_CDF, self).__init__(filename, CDF.NC.WRITE | CDF.NC.TRUNC | CDF.NC.CREATE) - self.add_ct_header() - - - def add_ct_header(self): - self.automode() - # - setattr(self, 'Institution', institution) - setattr(self, 'Contact', email) - setattr(self, 'URL', url) - setattr(self, 'Source', source) - setattr(self, 'Convention', conventions) - setattr(self, 'Disclaimer', disclaimer) - setattr(self, 'History', historytext) - - def add_params_dim(self, nparams): - self.automode() - dimparams = self.def_dim('nparameters', nparams) - return (dimparams,) - - def add_members_dim(self, nmembers): - self.automode() - dimmembers = self.def_dim('nmembers', nmembers) - return (dimmembers,) - - def add_lag_dim(self, nlag, unlimited=True): - self.automode() - if unlimited: - dimlag = self.def_dim('nlag', CDF.NC.UNLIMITED) - else: - dimlag = self.def_dim('nlag', nlag) - return (dimlag,) - - def add_obs_dim(self, nobs): - self.automode() - dimobs = self.def_dim('nobs', nobs) - return (dimobs,) - - def add_latlon_dim(self, istart=0, iend=360, jstart=0, jend=180): - from numpy import arange, float64 - - if 'latitude' in self.dimensions(): - return (self.dim('latitude'), self.dim('longitude')) # already exists - - lons = -180 + arange(360) * 1.0 + 0.5 - lats = -90 + arange(180) * 1.0 + 0.5 - # - lats = lats[jstart:jend] - lons = lons[istart:iend] - # - self.automode() - dimlon = self.def_dim('longitude', lons.shape[0]) - dimlat = self.def_dim('latitude', lats.shape[0]) - - savedict = self.standard_var(varname='latitude') - savedict['values'] = lats.tolist() - savedict['actual_range'] = (float(lats[0]), float(lats[-1])) - savedict['dims'] = (dimlat,) - self.add_data(savedict) - - savedict = self.standard_var(varname='longitude') - savedict['values'] = lons.tolist() - savedict['actual_range'] = (float(lons[0]), float(lons[-1])) - savedict['dims'] = (dimlon,) - self.add_data(savedict) - - return (dimlat, dimlon) - - def add_date_dim(self): - self.automode() - if 'date' in self.dimensions(): - return (self.dim('date'),) - return (self.def_dim('date', CDF.NC.UNLIMITED),) - - def add_date_dim_format(self): - self.automode() - if 'yyyymmddhhmmss' in self.dimensions(): - return (self.dim('yyyymmddhhmmss'),) # already exists - return (self.def_dim('yyyymmddhhmmss', 6),) - - def add_dim(self, dimname, dimsize): - if dimname not in self.dimensions(): - newdim = self.def_dim(dimname, dimsize) - return (newdim,) - - def has_date(self, dd): - if self.inq_unlimlen() > 0 and dd in self.get_variable('date').tolist(): - return True - else: - return False - - def get_variable(self, varname): - """ get variable from ncf file""" - return array(self.var(varname).get()) - - def standard_var(self, varname): - """ return properties of standard variables """ - import standardvariables - - if varname in standardvariables.standard_variables.keys(): - return standardvariables.standard_variables[varname] - else: - return standardvariables.standard_variables['unknown'] - - def add_data(self, datadict, nsets=1, silent=True): - """ add fields to file, at end of unlimited dimension""" - - existing_vars = self.variables() - try: - next = datadict['count'] - except: - next = 0 - - if existing_vars.has_key(datadict['name']): - var = self.var(datadict['name']) - var[next:next + nsets] = datadict['values'] - else: - if not silent: - print 'Creating new dataset: ' + datadict['name'] - if datadict.has_key('dtype'): - if datadict['dtype'] == 'int': - var = self.def_var(datadict['name'], CDF.NC.INT, datadict['dims']) - elif datadict['dtype'] == 'char': - var = self.def_var(datadict['name'], CDF.NC.CHAR, datadict['dims']) - elif datadict['dtype'] == 'double': - var = self.def_var(datadict['name'], CDF.NC.DOUBLE, datadict['dims']) - else: - var = self.def_var(datadict['name'], CDF.NC.FLOAT, datadict['dims']) - else: - var = self.def_var(datadict['name'], CDF.NC.FLOAT, datadict['dims']) - for k, v in datadict.iteritems(): - if k not in ['name', 'dims', 'values', '_FillValue', 'count']: - setattr(var, k, v) - if var.isrecord(): - var[next:next + nsets] = datadict['values'] - else: - var[:] = datadict['values'] - -def get_variable(file, varname): - """ get variable from HDF file""" - return array(file.select(varname).get()) - -def create_dirs(rundat, dirname): - dirname = os.path.join(rundat.outputdir, dirname) - if not os.path.exists(dirname): - print "Creating new output directory " + dirname - os.makedirs(dirname) - else: - print 'Writing files to directory: %s' % dirname - return dirname - - -if __name__ == '__main__': - try: - os.remove('test.nc') - except: - pass - ncf = CT_CDF('test.nc', 'create') - dimgrid = ncf.add_latlon_dim() - dimdate = ncf.add_date_dim() - dimidate = ncf.add_date_dim_format() diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index 16bfec43a322732ef841cdfe50c43cc952a5148b..0467d15e6bfa75f1c011cc7cbd1e77c38efd1839 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -15,6 +15,7 @@ import logging import os import sys import datetime +import copy header = '\n\n *************************************** ' footer = ' *************************************** \n ' @@ -50,23 +51,24 @@ def forward_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOpera # Read from other simulation and write priors, then read posteriors and propagate if DaCycle['dir.forward.savestate'] != 'None': - filename = os.path.join(DaCycle['dir.forward.savestate'],DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') #LU teraz czytamy savestate.nc - StateVector.read_from_legacy_file(filename,'prior') - else: prepare_state(DaCycle, StateVector) + filename = os.path.join(DaCycle['dir.forward.savestate'], DaCycle['time.start'].strftime('%Y%m%d'), 'savestate.nc') #LU teraz czytamy savestate.nc + StateVector.read_from_legacy_file(filename, 'prior') + else: + prepare_state(DaCycle, StateVector) StateVector.isOptimized = False # Write as prior fluxes to output dir - savedir = DaCycle['dir.output'] - savefilename = os.path.join(savedir, 'savestate.nc') #LU to jest state vector po zoptymalizowaniu. + savefilename = os.path.join(DaCycle['dir.output'], 'savestate.nc') #LU to jest state vector po zoptymalizowaniu. StateVector.write_to_file(savefilename) # Now read optimized fluxes to propagate - if DaCycle['dir.forward.savestate'] != 'None': + if DaCycle['dir.forward.savestate'] != 'None': #LU !? byloby tam cos takiego jka None? StateVector.read_from_legacy_file(filename) # by default will read "opt"(imized) variables, and then propagate - else: StateVector.read_from_file(savefilename,'prior') + else: + StateVector.read_from_file(savefilename, 'prior') StateVector.isOptimized = True @@ -119,13 +121,12 @@ def prepare_state(DaCycle, StateVector): # Read the StateVector data from file - filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc') #LU teraz czytamy savestate.nc - + filename = os.path.join(DaCycle['dir.restart.current'], 'savestate.nc') StateVector.read_from_file(filename) # by default will read "opt"(imized) variables, and then propagate # Now propagate the ensemble by one cycle to prepare for the current cycle - StateVector.propagate() + StateVector.propagate(DaCycle['time.start'], DaCycle['time.cycle']) # Finally, also write the StateVector to a file so that we can always access the a-priori information @@ -164,7 +165,7 @@ def sample_state(DaCycle, Samples, StateVector, ObservationOperator): def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvance=False): """ Perform all actions needed to sample one cycle """ - import copy + # First set up the information for time start and time end of this sample DaCycle.set_sample_times(lag) @@ -183,7 +184,7 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvan # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the # type of info needed in your transport model - StateVector.write_members_to_file(lag + 1) #LU parameters.nc + StateVector.write_members_to_file(lag + 1, DaCycle['dir.input']) #LU parameters.nc Samples.initialize() #LU to daje przede wszystkim zresetowanie data list. czyli to znaczy ze data list jest za kazdym razem nowa przy odpalaniu nowego cyklu Samples.add_observations() @@ -237,7 +238,7 @@ def sample_step(DaCycle, Samples, StateVector, ObservationOperator, lag, isadvan def invert(DaCycle, StateVector, Optimizer): """ Perform the inverse calculation """ - logging.info(msg = header + "starting invert" + footer) + logging.info(msg=header + "starting invert" + footer) dims = (int(DaCycle['time.nlag']), int(DaCycle['da.optimizer.nmembers']), int(DaCycle.DaSystem['nparameters']),