diff --git a/da/tools/initexit.py b/da/tools/initexit.py index 63baab21275d40dda59da18179d4b5cc0d910d43..55438c6a0585897d8def7a52a3c7087bcd6976de 100755 --- a/da/tools/initexit.py +++ b/da/tools/initexit.py @@ -104,6 +104,7 @@ class CycleControl(dict): self['verbose'] = '-v' in opts self.DaSystem = None # to be filled later self.RestartFileList = [] # List of files needed for restart, to be extended later + self.OutputFileList = [] # List of files needed for output, to be extended later def __str__(self): """ @@ -434,11 +435,40 @@ class CycleControl(dict): dummy = self.WriteNewRCfile() dummy = self.MoveRestartData(io_option='store') # Move restart data from current to one-ago dummy = self.CollectRestartData() # Collect restart data for next cycle into a clean restart/current folder + dummy = self.CollectOutput() # Collect restart data for next cycle into a clean restart/current folder dummy = self.SubmitNextCycle() + def CollectOutput(self): + """ Collect files that are vpart of the requested output for this cycle. This function allows users to add files + to a list, and then the system will copy these to the current cycle's output directory. + The list of files included is read from the + attribute "OutputFileList" which is a simple list of files that can be appended by other objects/methods that + require output data to be saved. + + + """ + from da.tools.general import CreateDirs + + targetdir = os.path.join(self['dir.output']) + + CreateDirs(os.path.join(targetdir) ) + + msg = "Collecting the required output data" ; logging.info(msg) + msg = " to directory: %s " % targetdir ; logging.debug(msg) + + for file in set(self.OutputFileList): + + if os.path.isdir(file): # skip dirs + continue + + msg = " [copy] .... %s " % file ; logging.debug(msg) + dummy = shutil.copy(file,file.replace(os.path.split(file)[0],targetdir) ) + + + def CollectRestartData(self): """ Collect files needed for the restart of this cycle in case of a crash, or for the continuation of the next cycle. - All files needded are written to the restart/current directory. The list of files included is read from the + All files needed are written to the restart/current directory. The list of files included is read from the attribute "RestartFileList" which is a simple list of files that can be appended by other objects/methods that require restart data to be saved. @@ -461,8 +491,6 @@ class CycleControl(dict): """ from da.tools.general import CreateDirs - self.RestartFileList.append( os.path.join(self['dir.output'],'savestate.nc') ) - targetdir = os.path.join(self['dir.restart.current']) msg = "Purging the current restart directory before collecting new data" ; logging.info(msg) @@ -472,6 +500,8 @@ class CycleControl(dict): msg = "Collecting the required restart data" ; logging.info(msg) msg = " to directory: %s " % targetdir ; logging.debug(msg) + print self.RestartFileList + for file in set(self.RestartFileList): if os.path.isdir(file): # skip dirs diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index 38d0e9b61d4769b0c7fa2c14ab084b98fc3187c1..fc8b976eb4067f7eae83ef2579d32899caa8c8c7 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -21,7 +21,7 @@ import da.tools.rc as rc header = '\n\n *************************************** ' footer = ' *************************************** \n ' -def Main(PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer): +def Main(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer): """ The main point of entry for the pipeline """ # Append current working dir to path @@ -30,30 +30,15 @@ def Main(PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer): # Import methods and classes contained in this package - from da.tools.initexit import ValidateOptsArgs - from da.tools.initexit import ParseOptions from da.tools.general import StoreData,RestoreData + msg = header+"Initializing current cycle"+footer ; logging.info(msg) -# Parse options from the command line - - opts, args = ParseOptions() - -# Validate Options and arguments passed - - opts,args = ValidateOptsArgs(opts,args) - -# Create the Cycle Control object for this job - - DaCycle = JobStart(opts,args, DaSystem, PlatForm) - - msg = header+"Initializing Da Cycle"+footer ; logging.info(msg) - - dummy = DaCycle.Initialize() + dummy = JobStart(DaCycle, DaSystem, PlatForm, StateVector) msg = header+"starting JobInput"+footer ; logging.info(msg) - StateVector = JobInput(DaCycle, StateVector) + dummy = JobInput(DaCycle, StateVector) msg = header+"starting SampleState"+footer ; logging.info(msg) @@ -80,38 +65,34 @@ def Main(PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer): #################################################################################################### -def JobStart(opts,args, DaSystem, DaPlatForm): +def JobStart(DaCycle, DaSystem, DaPlatForm, StateVector): """ Set up the job specific directory structure and create an expanded rc-file """ - from da.tools.initexit import CycleControl - - # First create a CycleControl object that handles all the details of the da cycle - DaCycle = CycleControl(opts,args) + dummy = DaSystem.Initialize() - # Next fill the DaSystem object that contains all the details related to the DA system + dummy = DaSystem.Validate() - dummy = DaSystem.Validate() + DaCycle.DaSystem = DaSystem - dummy = DaSystem.Initialize() + DaCycle.DaPlatForm = DaPlatForm - DaCycle.DaSystem = DaSystem + dummy = DaCycle.Initialize() - DaCycle.DaPlatForm = DaPlatForm + dims = ( int(DaCycle['time.nlag']), + int(DaCycle['forecast.nmembers']), + int(DaCycle.DaSystem['nparameters']), + ) - return DaCycle + dummy = StateVector.Initialize(dims) + StateVector.DaCycle = DaCycle -def JobInput(DaCycle, StateVector): - """ Set up the input data for the forward model: obs and parameters/fluxes""" - dims = ( int(DaCycle['time.nlag']), - int(DaCycle['forecast.nmembers']), - int(DaCycle.DaSystem['nparameters']), - ) + return None - dummy = StateVector.Initialize(dims) - nlag = dims[0] +def JobInput(DaCycle, StateVector): + """ Set up the input data for the forward model: obs and parameters/fluxes""" # We now have an empty StateVector object that we need to populate with data. If this is a continuation from a previous cycle, we can read # the previous StateVector values from a NetCDF file in the restart/current directory. If this is the first cycle, we need to populate the StateVector @@ -122,6 +103,7 @@ def JobInput(DaCycle, StateVector): # Fill each week from n=1 to n=nlag with a new ensemble + nlag = StateVector.nlag for n in range(0,nlag): date = DaCycle['time.start']+datetime.timedelta(days = (n+0.5)* int(DaCycle['time.cycle'])) cov = StateVector.GetCovariance(DaCycle.DaSystem,date) @@ -141,7 +123,7 @@ def JobInput(DaCycle, StateVector): dummy = StateVector.Propagate(DaCycle) - return StateVector + return None def SampleState(DaCycle,Samples,StateVector, ObservationOperator): """ Sample the filter state for the inversion """ @@ -217,10 +199,11 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag): dummy = RunForecastModel(DaCycle,ObservationOperator) - # Add the TM5 restart files to the general RestartFileList, only after 'Advance' + # Add the observation operator restart+output files to the general Output+RestartFileList, only after 'Advance' if lag == 0: DaCycle.RestartFileList.extend( ObservationOperator.RestartFileList ) + DaCycle.OutputFileList.extend( ObservationOperator.OutputFileList ) # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? @@ -322,6 +305,9 @@ def SaveAndSubmit( DaCycle, StateVector): filename = os.path.join(savedir,'savestate.nc') dummy = StateVector.WriteToFile(filename) + + DaCycle.RestartFileList.append( os.path.join(DaCycle['dir.output'],'savestate.nc') ) + dummy = DaCycle.Finalize() return None diff --git a/das.py b/das.py index 0ef2c395507d3367c38c37a207d97563350b229a..92ec0bf9ba03ac4a709b327e9d0844c703dcc924 100755 --- a/das.py +++ b/das.py @@ -10,14 +10,38 @@ #$ -j y #$ -r n +################################################################################################# +# First order of business is always to make all other python modules accessible through the path +################################################################################################# + import sys import os dummy = sys.path.append(os.getcwd()) +################################################################################################# +# Next, import the tools needed to initialize a data assimilation cycle, as well as the pipeline (Main()) +################################################################################################# + from da.tools.initexit import StartLogger +from da.tools.initexit import ValidateOptsArgs +from da.tools.initexit import ParseOptions from da.tools.pipeline import Main -dummy = StartLogger() +################################################################################################# +# Parse and validate the command line options +################################################################################################# + +dummy = StartLogger() +opts, args = ParseOptions() +opts,args = ValidateOptsArgs(opts,args) + +################################################################################################# +# Create the Cycle Control object for this job +################################################################################################# + +from da.tools.initexit import CycleControl + +DaCycle = CycleControl(opts,args) ########################################################################################### ### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ## @@ -31,8 +55,8 @@ from da.tm5.observationoperator import TM5ObservationOperator from da.ct.optimizer import CtOptimizer PlatForm = MaunaloaPlatForm() -DaSystem = CtDaSystem('carbontracker.rc') -ObsOperator = TM5ObservationOperator('/Users/peters/Modeling/TM5/tm5-ctdas.rc') +DaSystem = CtDaSystem('da/rc/carbontracker.rc') +ObsOperator = TM5ObservationOperator('/Users/peters/Modeling/TM5/tm5-ctdas-inv-ei-6x4.rc') Samples = CtObservations() StateVector = CtStateVector() Optimizer = CtOptimizer() @@ -46,7 +70,10 @@ print "\n ******************************************************************* print " *************************************** Entering Pipeline ******************************************" print " ********************************************************************************************************\n" -Main(PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer) +Main(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer) +########################################################################################## +################### All done +##########################################################################################