From 88afbc6c248ce79bfb16b24873cc1580311e4743 Mon Sep 17 00:00:00 2001 From: Wouter Peters <wouter.peters@wur.nl> Date: Fri, 4 Feb 2011 09:53:12 +0000 Subject: [PATCH] started interface for pycasso --- da/tm5/observationoperator.py | 52 +++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/da/tm5/observationoperator.py b/da/tm5/observationoperator.py index bb98774..328457a 100755 --- a/da/tm5/observationoperator.py +++ b/da/tm5/observationoperator.py @@ -152,6 +152,11 @@ class TM5ObservationOperator(ObservationOperator): self.RcFileName = RcFileName self.Tm5RcLoaded = True + if 'my.source.dirs' in self.tm_settings.keys(): + self.RcFileType = 'pycasso' + else: + self.RcfIleType = 'pre-pycasso' + msg = 'TM5 rc-file loaded successfully' ; logging.debug(msg) return True @@ -368,7 +373,7 @@ class TM5ObservationOperator(ObservationOperator): # Code for Option (1) - code = self.TM5_under_mpirun(DaCycle) + code = self.TM5_With_N_tracers(DaCycle) if code == 0: logging.info('Finished model executable succesfully (%s)'%code) @@ -429,18 +434,49 @@ class TM5ObservationOperator(ObservationOperator): return code - def TM5_With_N_tracers(self, nprocesses): + def TM5_With_N_tracers(self, DaCycle): """ Method handles the case where one TM5 model instance with N tracers does the sampling of all ensemble members""" from string import join + import datetime + + DaPlatForm = DaCycle.DaPlatForm + + targetdir = os.path.join(self.tm_settings['rundir']) + + # Go to executable directory and start the subprocess, using a new logfile + + dummy = os.chdir(targetdir) + msg = 'Changing directory to %s ' % targetdir ;logging.debug(msg) + + # Remove the tm5.ok file from a previous run, placed back only if a successful TM5 run is executed + + okfile = 'tm5.ok' + if os.path.exists(okfile): + dummy = os.remove(okfile) - modellogfile = open(self.ModelLogFilename,'w') - #cmd = ['openmpirun','-np', '10', mpi_shell_filename,'./tm5.x'] - cmd = ['mpirun','-np',str(nprocesses),'./tm5.x'] - msg = 'Starting model executable as subprocess (%s)'%join(cmd,' ') ; logging.info(msg) + nprocesses = int(DaCycle['forecast.nmembers'])/5 # Note that we assign 5 tracers to each processor, this seems good for TM5 + jobparams = {'jobname':'tm5', + 'jobnodes':'ncomp %d'%int(nprocesses), + 'jobtime':'02:00:00', + 'joblog':os.path.join(DaCycle['dir.jobs']) + } - #code = subprocess.call(cmd,stdout=modellogfile,stderr=modellogfile) + template = DaPlatForm.GetJobTemplate(jobparams,block=True) + template += 'cd %s\n'%targetdir + template += 'mpirun -np %d ./tm5.x\n'%(int(nprocesses),) + + jobfile = DaPlatForm.WriteJob(DaCycle,template,'tm5') + + msg = 'Submitting job at %s'%datetime.datetime.now() ; logging.debug(msg) + code = DaPlatForm.SubmitJob(jobfile) + msg = 'Resuming job at %s'%datetime.datetime.now() ; logging.debug(msg) + + if not os.path.exists(okfile): + code = -1 + else: + code = 0 - modellogfile.close() + return code def SaveData(self): """ Copy the TM5 recovery data from the outputdir to the TM5 savedir, also add the restart files to a list of names -- GitLab