Commit 7b8f70f0 authored by Peters, Wouter's avatar Peters, Wouter
Browse files

modifications to archive pipeline to allow multiple rsyncs with custom...

modifications to archive pipeline to allow multiple rsyncs with custom selection of folders to sync, see template.rc for example
parent fe1f943f
......@@ -158,19 +158,21 @@ def archive_pipeline(dacycle, platform, dasystem):
if not dacycle.has_key('task.rsync'):
logging.info('rsync task not found, not starting automatic backup...')
return
else:
logging.info('rsync task found, starting automatic backup...')
sourcedir = dacycle['dir.da_run']
destdir = dacycle['task.rsync.destination']
for task in dacycle['task.rsync'].split():
sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task]
destdir = dacycle['task.rsync.%s.destinationdir'%task]
rsyncflags = dacycle['task.rsync.flags']
rsyncflags = dacycle['task.rsync.%s.flags'%task]
# file ID and names
jobid = dacycle['time.end'].strftime('%Y%m%d')
targetdir = os.path.join(dacycle['dir.exec'])
jobfile = os.path.join(targetdir, 'jb.rsync.%s.jb' % jobid)
logfile = os.path.join(targetdir, 'jb.rsync.%s.log' % jobid)
jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) )
logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) )
# Template and commands for job
jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile}
......@@ -178,8 +180,10 @@ def archive_pipeline(dacycle, platform, dasystem):
jobparams['jobqueue'] = 'staging'
template = platform.get_job_template(jobparams)
execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,)
template += execcommand
for sourcedir in sourcedirs.split():
execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,)
template += execcommand
# write and submit
platform.write_job(jobfile, template, jobid)
jobid = platform.submit_job(jobfile, joblog=logfile)
......@@ -324,12 +328,12 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
if not advance:
if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1:
statevector.obs_to_assimmilate += (copy.deepcopy(samples),)
statevector.obs_to_assimilate += (copy.deepcopy(samples),)
statevector.nobs += samples.getlength()
logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector")
else:
statevector.obs_to_assimmilate += (None,)
statevector.obs_to_assimilate += (None,)
def invert(dacycle, statevector, optimizer):
......
......@@ -31,6 +31,12 @@ da.optimizer.nmembers : 150
! info on the archive task, if any
task.rsync : True
task.rsync.destination : peters@maunaloa.wur.nl:/Storage/CO2/peters/
task.rsync.flags : -auvz -e ssh
task.rsync : alldata onlyresults
task.rsync.alldata.sourcedirs : ${dir.da_run}
task.rsync.alldata.destinationdir : you@yourserver.com:/yourfolder/
task.rsync.alldata.flags : -auv -e ssh
task.rsync.onlyresults.sourcedirs : ${dir.da_run}/analysis ${dir.da_run}/output
task.rsync.onlyresults.destinationdir : you@yourserver.com:/yourfolder/
task.rsync.onlyresults.flags : -auv -e ssh
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment