diff --git a/da/ct/obs.py b/da/ct/obs.py index d95486c9d82d1a13c7812508f81d6e4d2abdee87..f9f758c18c408196d3b9bc11f56f5f1c9e486a9e 100755 --- a/da/ct/obs.py +++ b/da/ct/obs.py @@ -164,7 +164,7 @@ class CtObservations(Observation): import da.tools.io4 as io from da.tools.general import ToDectime - obsinputfile = os.path.join(self.DaCycle['dir.input'],'observations.nc') + obsinputfile = os.path.join(self.DaCycle['dir.input'],'observations_%s.nc'% self.DaCycle['time.sample.stamp']) f = io.CT_CDF(obsinputfile,method='create') msg = 'Creating new observations file for ObservationOperator (%s)' % obsinputfile ; logging.debug(msg) diff --git a/da/tm5/observationoperator.py b/da/tm5/observationoperator.py index 732f5f1101b028ce4c2f813250d7479204923198..f47b951fff80d924152733fee4b22150c0240b57 100755 --- a/da/tm5/observationoperator.py +++ b/da/tm5/observationoperator.py @@ -573,7 +573,7 @@ class TM5ObservationOperator(ObservationOperator): msg = " [added to restart list] .... %s " % file ; logging.debug(msg) sourcedir = os.path.join(self.tm_settings[self.outputdirkey]) - sd_ed = '%s_%s'%(self.tm_settings[self.timestartkey].strftime('%Y%m%d00'),self.tm_settings[self.timefinalkey].strftime('%Y%m%d00'),) + sd_ed = self.DaCycle['time.sample.stamp'] filter = ['flask_%s'%sd_ed,'flux1x1_%s'%sd_ed] msg = "Creating a new list of TM5 output data to collect" ; logging.debug(msg) diff --git a/da/tools/initexit.py b/da/tools/initexit.py index e6b663b1c074d7c010c4beded5ccfc4239491eec..85097bd40df9b161d7a463dfab643055656b9bdc 100755 --- a/da/tools/initexit.py +++ b/da/tools/initexit.py @@ -240,7 +240,7 @@ class CycleControl(dict): logging.info(msg) - dummy = self.RestartFileList.append(filename) + dummy = self.RestartFileList.extend([filename]) msg = "Added the randomseed.pickle file to the RestartFileList" ; logging.debug(msg) @@ -445,7 +445,7 @@ class CycleControl(dict): dummy = self.SubmitNextCycle() def CollectOutput(self): - """ Collect files that are vpart of the requested output for this cycle. This function allows users to add files + """ Collect files that are part of the requested output for this cycle. This function allows users to add files to a list, and then the system will copy these to the current cycle's output directory. The list of files included is read from the attribute "OutputFileList" which is a simple list of files that can be appended by other objects/methods that @@ -509,14 +509,15 @@ class CycleControl(dict): msg = "Collecting the required restart data" ; logging.info(msg) msg = " to directory: %s " % targetdir ; logging.debug(msg) - for file in set(self.RestartFileList): if os.path.isdir(file): # skip dirs continue - - msg = " [copy] .... %s " % file ; logging.debug(msg) - dummy = shutil.copy(file,file.replace(os.path.split(file)[0],targetdir) ) + if not os.path.exists(file): # skip dirs + msg = " [not found] .... %s " % file ; logging.warning(msg) + else: + msg = " [copy] .... %s " % file ; logging.debug(msg) + dummy = shutil.copy(file,file.replace(os.path.split(file)[0],targetdir) ) def MoveRestartData(self, io_option='restore'): @@ -601,7 +602,7 @@ class CycleControl(dict): self['da.restart.fname'] = fname dummy = rc.write(fname,self) - dummy = self.RestartFileList.append(fname) + dummy = self.RestartFileList.extend([fname]) msg = 'Added da_runtime.rc to the RestartFileList for later collection' ; logging.debug(msg) msg = 'Wrote new da_runtime.rc (%s) to exec dir'%fname ; logging.debug(msg) diff --git a/da/tools/pipeline.py b/da/tools/pipeline.py index 5e8e82ea2ff7e8de3fd054f46b5521b7bdaf7165..0fe6cf0549fa23d5cf3869f1e7706095afbc9067 100755 --- a/da/tools/pipeline.py +++ b/da/tools/pipeline.py @@ -172,10 +172,12 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag): DaCycle['time.sample.start'] = startdate DaCycle['time.sample.end'] = enddate DaCycle['time.sample.window'] = lag + DaCycle['time.sample.stamp'] = "%s_%s"%(startdate.strftime("%Y%m%d%H"),enddate.strftime("%Y%m%d%H"),) msg = "New simulation interval set : " ; logging.info(msg) msg = " start date : %s " % startdate.strftime('%F %H:%M') ; logging.info(msg) msg = " end date : %s " % enddate.strftime('%F %H:%M') ; logging.info(msg) + msg = " file stamp: %s " % DaCycle['time.sample.stamp'] ; logging.info(msg) # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the @@ -191,8 +193,13 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag): filename = Samples.WriteSampleInfo() + # Write filename to DaCycle, and to output collection list + DaCycle['ObsOperator.inputfile'] = filename + DaCycle.OutputFileList.extend([filename]) + msg = "Appended Obs filename to DaCycle for collection " ; logging.debug(msg) + # Run the observation operator dummy = RunForecastModel(DaCycle,ObservationOperator) @@ -202,7 +209,7 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag): if lag == 0: DaCycle.RestartFileList.extend( ObservationOperator.RestartFileList ) DaCycle.OutputFileList.extend(ObservationOperator.OutputFileList ) - msg = "Extended DaCycle restart and output file lists " ; logging.debug(msg) + msg = "Appended ObsOperator restart and output file lists to DaCycle for collection " ; logging.debug(msg) # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future?? @@ -218,8 +225,7 @@ def SampleOneCycle(DaCycle,Samples,StateVector, ObservationOperator,lag): # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates # one file per member, some logic needs to be included to merge all files!!! - base_date = "%s_%s"%(DaCycle['time.sample.start'].strftime('%Y%m%d%H'),DaCycle['time.sample.end'].strftime('%Y%m%d%H'),) - filename = os.path.join(ObservationOperator.outputdir,'flask_%s.nc'%base_date) + filename = os.path.join(ObservationOperator.outputdir,'flask_%s.nc'%DaCycle['time.sample.stamp']) dummy = Samples.AddSimulations(filename) # Give each member a model sample by first copying all samples, and then selecting the data for Member #n @@ -305,7 +311,7 @@ def SaveAndSubmit( DaCycle, StateVector): dummy = StateVector.WriteToFile(filename) - DaCycle.RestartFileList.append( os.path.join(DaCycle['dir.output'],'savestate.nc') ) + DaCycle.RestartFileList.extend( [os.path.join(DaCycle['dir.output'],'savestate.nc') ] ) dummy = DaCycle.Finalize()