Commit 1173cdcf authored by Florentie, Liesbeth's avatar Florentie, Liesbeth
Browse files

fixed links to analysis folder in analysis pipeline

parent 37e83b8f
......@@ -165,14 +165,65 @@ class ctsfCycleControl(CycleControl):
logging.info('Copied extended regions file to the analysis directory: %s'%os.path.join(self.dasystem['extendedregionsfile']))
else:
shutil.copy(os.path.join(self['dir.da_source'],'da','analysis','olson_extended.nc'),os.path.join(self['dir.analysis'],'copied_regions_extended.nc'))
logging.info('Copied extended regions from the da/analysis directory: %s'%os.path.join(self['dir.da_source'],'da','analysis','olson_extended.nc'))
for filename in glob.glob(os.path.join(self['dir.analysis'],'*.pickle')):
logging.info('Copied extended regions from the source to analysis directory: %s'%os.path.join(self['dir.analysis'],'olson_extended.nc'))
for filename in glob.glob(os.path.join(self['dir.da_source'],'da','analysis','*.pickle')):
logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1])
os.remove(filename)
for filename in glob.glob(os.path.join(self['dir.exec'],'*.pickle')):
logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1])
os.remove(filename)
if os.path.exists(os.path.join(self['dir.exec'],'da','analysis')):
for filename in glob.glob(os.path.join(self['dir.exec'],'da','analysis','*.pickle')):
logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1])
os.remove(filename)
if self.dasystem.has_key('random.seed.init'):
self.read_random_seed(True)
self.parse_times()
def setup_file_structure(self):
"""
Create file structure needed for data assimilation system.
In principle this looks like:
* ``${da_rundir}``
* ``${da_rundir}/input``
* ``${da_rundir}/output``
* ``${da_rundir}/exec``
* ``${da_rundir}/analysis``
* ``${da_rundir}/jobs``
* ``${da_rundir}/restart/current``
* ``${da_rundir}/restart/one-ago``
.. note:: The exec dir will actually be a simlink to the directory where
the observation operator executable lives. This directory is passed through
the ``da.rc`` file.
.. note:: The observation input files will be placed in the exec dir,
and the resulting simulated values will be retrieved from there as well.
"""
# Create the run directory for this DA job, including I/O structure
filtertime = self['time.start'].strftime('%Y%m%d')
self['dir.exec'] = os.path.join(self['dir.da_run'], 'exec')
self['dir.input'] = os.path.join(self['dir.da_run'], 'input')
self['dir.output'] = os.path.join(self['dir.da_run'], 'output', filtertime)
self['dir.analysis'] = os.path.join(self['dir.da_run'], 'analysis')
self['dir.jobs'] = os.path.join(self['dir.da_run'], 'jobs')
self['dir.restart'] = os.path.join(self['dir.da_run'], 'restart')
create_dirs(self['dir.da_run'])
create_dirs(os.path.join(self['dir.exec']))
create_dirs(os.path.join(self['dir.input']))
create_dirs(os.path.join(self['dir.output']))
create_dirs(os.path.join(self['dir.analysis']))
create_dirs(os.path.join(self['dir.jobs']))
create_dirs(os.path.join(self['dir.restart']))
#create_dirs(os.path.join(self['dir.exec'], 'da', 'analysis'))
logging.info('Succesfully created the file structure for the assimilation job')
......@@ -51,115 +51,34 @@ def ctsf_pipeline(dacycle, platform, dasystem, samples, statevector, fluxmodel,
logging.info('Inversion cycle finished... exiting pipeline')
# def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer):
# """ The main point of entry for the pipeline """
# sys.path.append(os.getcwd())
#
# logging.info(header + "Initializing current cycle" + footer)
# start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
#
# prepare_state(dacycle, statevector)
# sample_state(dacycle, samples, statevector, obsoperator)
#
# invert(dacycle, statevector, optimizer)
#
# advance(dacycle, samples, statevector, obsoperator)
#
# save_and_submit(dacycle, statevector)
# logging.info("Cycle finished...exiting pipeline")
# def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator):
# """ The main point of entry for the pipeline """
# sys.path.append(os.getcwd())
#
# logging.info(header + "Initializing current cycle" + footer)
# start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
#
# if dacycle.has_key('forward.savestate.exceptsam'):
# sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"])
# else:
# sam = False
#
# if dacycle.has_key('forward.savestate.dir'):
# fwddir = dacycle['forward.savestate.dir']
# else:
# logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters")
# fwddir = False
#
# if dacycle.has_key('forward.savestate.legacy'):
# legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"])
# else:
# legacy = False
# logging.debug("No forward.savestate.legacy key found in rc-file")
#
# if not fwddir:
# # Simply make a prior statevector using the normal method
# prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel.
#
# else:
# # Read prior information from another simulation into the statevector.
# # This loads the results from another assimilation experiment into the current statevector
#
# if sam:
# filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d'))
# #filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')
# statevector.read_from_file_exceptsam(filename, 'prior')
# elif not legacy:
# filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d'))
# statevector.read_from_file(filename, 'prior')
# else:
# filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')
# statevector.read_from_legacy_file(filename, 'prior')
#
#
# # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector
# # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current
# # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit.
# # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into
# # the current format through the "write_to_file" method.
#
# savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
# statevector.write_to_file(savefilename, 'prior')
#
# # Now read optimized fluxes which we will actually use to propagate through the system
#
# if not fwddir:
#
# # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector
# logging.info("Running forward with prior savestate from: %s"%savefilename)
#
# else:
#
# # Read posterior information from another simulation into the statevector.
# # This loads the results from another assimilation experiment into the current statevector
#
# if sam:
# statevector.read_from_file_exceptsam(filename, 'opt')
# elif not legacy:
# statevector.read_from_file(filename, 'opt')
# else:
# statevector.read_from_legacy_file(filename, 'opt')
#
# logging.info("Running forward with optimized savestate from: %s"%filename)
#
# # Finally, we run forward with these parameters
#
# advance(dacycle, samples, statevector, obsoperator)
#
# # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list.
# # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis.
#
# save_and_submit(dacycle, statevector)
#
# logging.info("Cycle finished...exiting pipeline")
####################################################################################################
def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
""" Main entry point for analysis of ctdas results """
# copy da/analysis folder to exec folder and copy copied_region files
if dacycle['dir.da_source'] != dacycle['dir.exec']:
import shutil
from da.tools.general import create_dirs
create_dirs(os.path.join(dacycle['dir.exec'],'da'))
try:
shutil.copytree(os.path.join(dacycle['dir.da_source'],'da','analysis'),os.path.join(dacycle['dir.exec'],'da','analysis'))
except:
try:
create_dirs(os.path.join(dacycle['dir.exec'],'da','analysis'))
shutil.copy(os.path.join(dacycle['dir.da_source'],'da','analysis/*'),os.path.join(dacycle['dir.exec'],'da','analysis/'))
except:
pass
shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions.nc'))
shutil.move(os.path.join(dacycle['dir.analysis'],'copied_regions_extended.nc'),os.path.join(dacycle['dir.exec'],'da','analysis','copied_regions_extended.nc'))
# ----------
from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data
from da.analysis.expand_molefractions import write_mole_fractions
from da.analysis.summarize_obs import summarize_obs
from da.analysis.time_avg_fluxes import time_avg
logging.info(header + "Starting analysis" + footer)
dasystem.validate()
......@@ -168,11 +87,6 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
dacycle.setup()
statevector.setup(dacycle)
from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data
from da.analysis.expand_molefractions import write_mole_fractions
from da.analysis.summarize_obs import summarize_obs
from da.analysis.time_avg_fluxes import time_avg
logging.info(header + "Starting mole fractions" + footer)
write_mole_fractions(dacycle)
......@@ -202,40 +116,7 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
logging.info(header + "Finished analysis" + footer)
# def archive_pipeline(dacycle, platform, dasystem):
# """ Main entry point for archiving of output from one disk/system to another """
#
# if not dacycle.has_key('task.rsync'):
# logging.info('rsync task not found, not starting automatic backup...')
# return
# else:
# logging.info('rsync task found, starting automatic backup...')
#
# for task in dacycle['task.rsync'].split():
# sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task]
# destdir = dacycle['task.rsync.%s.destinationdir'%task]
#
# rsyncflags = dacycle['task.rsync.%s.flags'%task]
#
# # file ID and names
# jobid = dacycle['time.end'].strftime('%Y%m%d')
# targetdir = os.path.join(dacycle['dir.exec'])
# jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) )
# logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) )
# # Template and commands for job
# jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile}
#
# if platform.ID == 'cartesius':
# jobparams['jobqueue'] = 'staging'
#
# template = platform.get_job_template(jobparams)
# for sourcedir in sourcedirs.split():
# execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,)
# template += execcommand
#
# # write and submit
# platform.write_job(jobfile, template, jobid)
# jobid = platform.submit_job(jobfile, joblog=logfile)
####################################################################################################
def start_job(dacycle, dasystem, platform, statevector, samples, fluxmodel, obsoperator):
......@@ -245,46 +126,11 @@ def start_job(dacycle, dasystem, platform, statevector, samples, fluxmodel, obso
dacycle.dasystem = dasystem
dacycle.daplatform = platform
dacycle.setup()
#statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc
#samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc
#obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc
statevector.setup(dacycle)
fluxmodel.setup(dacycle) # Settings for calculating flux from statistical fit
obsoperator.setup(dacycle) # Setup Observation Operator
# def prepare_state(dacycle, statevector):
# """ Set up the input data for the forward model: obs and parameters/fluxes"""
#
# # We now have an empty statevector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
# # the previous statevector values from a NetCDF file in the restart directory. If this is the first cycle, we need to populate the statevector
# # with new values for each week. After we have constructed the statevector, it will be propagated by one cycle length so it is ready to be used
# # in the current cycle
#
# logging.info(header + "starting prepare_state" + footer)
#
# if not dacycle['time.restart']:
#
# # Fill each week from n=1 to n=nlag with a new ensemble
#
# for n in range(statevector.nlag):
# date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle']))
# cov = statevector.get_covariance(date, dacycle)
# statevector.make_new_ensemble(n, cov)
#
# else:
#
# # Read the statevector data from file
# saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d'))
# statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate
#
# # Now propagate the ensemble by one cycle to prepare for the current cycle
# statevector.propagate(dacycle)
#
# # Finally, also write the statevector to a file so that we can always access the a-priori information
# current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
# statevector.write_to_file(current_sv, 'prior') # write prior info
def sample_state(dacycle, samples, statevector, fluxmodel, obsoperator):
""" Sample the filter state for the inversion """
......@@ -415,42 +261,6 @@ def invert(dacycle, statevector, optimizer):
optimizer.write_diagnostics(diagnostics_file, 'optimized')
# def advance(dacycle, samples, statevector, obsoperator):
# """ Advance the filter state to the next step """
#
# # This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
#
# # Then, restore model state from the start of the filter
# logging.info(header + "starting advance" + footer)
# logging.info("Sampling model will be run over 1 cycle")
#
# obsoperator.get_initial_data()
#
# sample_step(dacycle, samples, statevector, obsoperator, 0, True)
#
# dacycle.restart_filelist.extend(obsoperator.restart_filelist)
# dacycle.output_filelist.extend(obsoperator.output_filelist)
# logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ")
#
# dacycle.output_filelist.append(dacycle['ObsOperator.inputfile'])
# logging.debug("Appended Observation filename to dacycle for collection: %s"%(dacycle['ObsOperator.inputfile']))
#
# sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
# if os.path.exists(sampling_coords_file):
# outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
# samples.write_sample_auxiliary(outfile)
# else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)")
# def save_and_submit(dacycle, statevector):
# """ Save the model state and submit the next job """
# logging.info(header + "starting save_and_submit" + footer)
#
# filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
# statevector.write_to_file(filename, 'opt')
#
# dacycle.output_filelist.append(filename)
# dacycle.finalize()
def write_optimized_state_and_fluxes(dacycle, samples, statevector, fluxmodel, obsoperator):
""" Calculate the fluxes for the optimized statevector and write to output file """
......@@ -463,7 +273,6 @@ def write_optimized_state_and_fluxes(dacycle, samples, statevector, fluxmodel, o
dacycle.output_filelist.append(filename)
statevector.write_members_to_file(0, dacycle['dir.output'],member=0)
#fluxmodel.calc_flux(postprocessing=True)
fluxmodel.calc_flux(obsoperator.timevec, member=0, updateano=True, postprocessing=True, write=True)
dacycle.finalize()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment