Commit 0a421b90 authored by Ingrid Luijkx's avatar Ingrid Luijkx
Browse files

Added additional options for South America fwd runs, including option to run...

Added additional options for South America fwd runs, including option to run forward when no observation is found in that cycle.
parent 526fb0cf
......@@ -99,7 +99,16 @@ def save_weekly_avg_1x1_data(dacycle, statevector):
fire = np.array(file.get_variable(dacycle.dasystem['background.co2.fires.flux']))
fossil = np.array(file.get_variable(dacycle.dasystem['background.co2.fossil.flux']))
#mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1']))
if dacycle.dasystem['background.co2.biosam.flux'] in file.variables.keys():
sam = True
biosam = np.array(file.get_variable(dacycle.dasystem['background.co2.biosam.flux']))
firesam = np.array(file.get_variable(dacycle.dasystem['background.co2.firesam.flux']))
else: sam = False
file.close()
if sam:
bio = bio + biosam
fire = fire + firesam
next = ncf.inq_unlimlen()[0]
......@@ -278,8 +287,17 @@ def save_weekly_avg_state_data(dacycle, statevector):
fire = np.array(file.get_variable(dacycle.dasystem['background.co2.fires.flux']))
fossil = np.array(file.get_variable(dacycle.dasystem['background.co2.fossil.flux']))
#mapped_parameters = np.array(file.get_variable(dacycle.dasystem['final.param.mean.1x1']))
if dacycle.dasystem['background.co2.biosam.flux'] in file.variables.keys():
sam = True
biosam = np.array(file.get_variable(dacycle.dasystem['background.co2.biosam.flux']))
firesam = np.array(file.get_variable(dacycle.dasystem['background.co2.firesam.flux']))
else: sam = False
file.close()
if sam:
bio = bio + biosam
fire = fire + firesam
next = ncf.inq_unlimlen()[0]
vectorbio = statevector.grid2vector(griddata=bio * area, method='sum')
......
......@@ -148,103 +148,104 @@ class ObsPackObservations(Observations):
"""
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
dimid = f.add_dim('obs', len(self.datalist))
dim200char = f.add_dim('string_of200chars', 200)
dim10char = f.add_dim('string_of10chars', 10)
dimcalcomp = f.add_dim('calendar_components', 6)
if len(self.datalist) == 0:
f.close()
#f.close()
#return obsinputfile
logging.debug("No observations found for this time period, nothing written to obs file")
else:
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
dimid = f.add_dim('obs', len(self.datalist))
dim200char = f.add_dim('string_of200chars', 200)
dim10char = f.add_dim('string_of10chars', 10)
dimcalcomp = f.add_dim('calendar_components', 6)
for key, value in self.site_move.iteritems():
msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value
f.add_attribute(key, msg)
data = self.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
f.add_data(savedict)
data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid + dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
f.add_data(savedict)
data = self.getvalues('lat')
savedict = io.std_savedict.copy()
savedict['name'] = "latitude"
savedict['units'] = "degrees_north"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('lon')
savedict = io.std_savedict.copy()
savedict['name'] = "longitude"
savedict['units'] = "degrees_east"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('height')
savedict = io.std_savedict.copy()
savedict['name'] = "altitude"
savedict['units'] = "meters_above_sea_level"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('samplingstrategy')
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "sampling_strategy"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
f.add_data(savedict)
data = self.getvalues('evn')
savedict = io.std_savedict.copy()
savedict['dtype'] = "char"
savedict['name'] = "obs_id"
savedict['units'] = "ObsPack datapoint identifier"
savedict['dims'] = dimid + dim200char
savedict['values'] = data
savedict['missing_value'] = '!'
f.add_data(savedict)
for key, value in self.site_move.iteritems():
msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value
f.add_attribute(key, msg)
data = self.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
f.add_data(savedict)
data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid + dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
f.add_data(savedict)
data = self.getvalues('lat')
savedict = io.std_savedict.copy()
savedict['name'] = "latitude"
savedict['units'] = "degrees_north"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('lon')
savedict = io.std_savedict.copy()
savedict['name'] = "longitude"
savedict['units'] = "degrees_east"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('height')
savedict = io.std_savedict.copy()
savedict['name'] = "altitude"
savedict['units'] = "meters_above_sea_level"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('samplingstrategy')
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "sampling_strategy"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
f.add_data(savedict)
data = self.getvalues('evn')
savedict = io.std_savedict.copy()
savedict['dtype'] = "char"
savedict['name'] = "obs_id"
savedict['units'] = "ObsPack datapoint identifier"
savedict['dims'] = dimid + dim200char
savedict['values'] = data
savedict['missing_value'] = '!'
f.add_data(savedict)
f.close()
f.close()
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
......
......@@ -134,6 +134,45 @@ class CO2StateVector(StateVector):
f.close()
logging.info('Successfully read the State Vector from file (%s) ' % filename)
def read_from_file_exceptsam(self, filename, qual='opt'):
"""
:param filename: the full filename for the input NetCDF file
:param qual: a string indicating whether to read the 'prior' or 'opt'(imized) StateVector from file
:rtype: None
Read the StateVector information from a NetCDF file and put in a StateVector object
In principle the input file will have only one four datasets inside
called:
* `meanstate_prior`, dimensions [nlag, nparamaters]
* `ensemblestate_prior`, dimensions [nlag,nmembers, nparameters]
* `meanstate_opt`, dimensions [nlag, nparamaters]
* `ensemblestate_opt`, dimensions [nlag,nmembers, nparameters]
This NetCDF information can be written to file using
:meth:`~da.baseclasses.statevector.StateVector.write_to_file`
"""
f = io.ct_read(filename, 'read')
meanstate = f.get_variable('statevectormean_' + qual)
meanstate[:,39:77] = 1
ensmembers = f.get_variable('statevectorensemble_' + qual)
f.close()
for n in range(self.nlag):
if not self.ensemble_members[n] == []:
self.ensemble_members[n] = []
logging.warning('Existing ensemble for lag=%d was removed to make place for newly read data' % (n + 1))
for m in range(self.nmembers):
newmember = EnsembleMember(m)
newmember.param_values = ensmembers[n, m, :].flatten() + meanstate[n] # add the mean to the deviations to hold the full parameter values
self.ensemble_members[n].append(newmember)
logging.info('Successfully read the State Vector from file (%s) ' % filename)
logging.info('State Vector set to 1 for South American regions')
################### End Class CO2StateVector ###################
......
......@@ -46,7 +46,9 @@ final.co2_mixingratio.ensemble.simulated : dF_f
background.co2.fossil.flux : flux_ff_prior_mean
background.co2.fires.flux : flux_fires_prior_mean
background.co2.firesam.flux : flux_firesam_prior_mean
background.co2.bio.flux : flux_bio_prior_mean
background.co2.biosam.flux : flux_biosam_prior_mean
background.co2.ocean.flux : flux_ocean_prior_mean
background.co2.res.flux : flux_res_prior_mean
background.co2.gpp.flux : flux_gpp_prior_mean
......
......@@ -44,6 +44,9 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera
logging.info(header + "Initializing current cycle" + footer)
start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
if dacycle.has_key('forward.savestate.exceptsam'):
sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"])
if dacycle.has_key('forward.savestate.dir'):
fwddir = dacycle['forward.savestate.dir']
else:
......@@ -66,11 +69,14 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera
# Read prior information from another simulation into the statevector.
# This loads the results from another assimilation experiment into the current statevector
if not legacy:
if sam:
filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')
statevector.read_from_file_exceptsam(filename, 'prior')
elif not legacy:
filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d'))
statevector.read_from_file(filename, 'prior')
else:
filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc')
filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')
statevector.read_from_legacy_file(filename, 'prior')
......@@ -95,7 +101,9 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera
# Read posterior information from another simulation into the statevector.
# This loads the results from another assimilation experiment into the current statevector
if not legacy:
if sam:
statevector.read_from_file_exceptsam(filename, 'opt')
elif not legacy:
statevector.read_from_file(filename, 'opt')
else:
statevector.read_from_legacy_file(filename, 'opt')
......@@ -318,7 +326,9 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
# We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
# one file per member, some logic needs to be included to merge all files!!!
simulated_file = os.path.join(obsoperator.outputdir, 'flask_output.%s.nc' % dacycle['time.sample.stamp'])
samples.add_simulations(simulated_file)
if os.path.exists(sampling_coords_file):
samples.add_simulations(simulated_file)
else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)")
# Now write a small file that holds for each observation a number of values that we need in postprocessing
......@@ -398,9 +408,11 @@ def advance(dacycle, samples, statevector, obsoperator):
dacycle.output_filelist.append(dacycle['ObsOperator.inputfile'])
logging.debug("Appended Observation filename to dacycle for collection ")
outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_auxiliary(outfile)
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
if os.path.exists(sampling_coords_file):
outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_auxiliary(outfile)
else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)")
def save_and_submit(dacycle, statevector):
""" Save the model state and submit the next job """
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment