Commit 6554e142 authored by Tsurata, Aki's avatar Tsurata, Aki
Browse files

No commit message

No commit message
parent d52f0294
This diff is collapsed.
#!/usr/bin/env python
import numpy as np
import cPickle
def state_to_grid(values, regionmap, reverse=False, avg=False, mapname=None):
"""
This method converts parameters from a CarbonTracker StateVector object to a gridded map of linear multiplication values. These
can subsequently be used in the transport model code to multiply/manipulate fluxes
"""
nregions = regionmap.max()
try:
if not mapname:
raise Exception
regionselect = cPickle.load(open('%s_regiondict.pickle' % mapname, 'rb'))
except:
# dictionary for region <-> map conversions
regs = {}
for r in np.arange(1, nregions + 1):
sel = (regionmap.flat == r).nonzero()
if len(sel[0]) > 0:
regs[r] = sel
regionselect = regs
cPickle.dump(regionselect, open('%s_regiondict.pickle' % mapname, 'wb'), -1)
print 'Pickling region map'
if reverse:
""" project 1x1 degree map onto ecoregions """
result = np.zeros(nregions, float)
for k, v in regionselect.iteritems():
if avg:
result[k - 1] = values.ravel().take(v).mean()
else :
result[k - 1] = values.ravel().take(v).sum()
return result
else:
""" project ecoregion properties onto 1x1 degree map """
result = np.zeros((180, 360,), float)
for k, v in regionselect.iteritems():
result.put(v, values[k - 1])
return result
def globarea(im=360, jm=180, silent=True):
""" Function calculates the surface area according to TM5 definitions"""
radius = 6.371e6 # the earth radius in meters
deg2rad = np.pi / 180.
g = 9.80665
dxx = 360.0 / im * deg2rad
dyy = 180.0 / jm * deg2rad
lat = np.arange(-90 * deg2rad, 90 * deg2rad, dyy)
dxy = dxx * (np.sin(lat + dyy) - np.sin(lat)) * radius ** 2
area = np.resize(np.repeat(dxy, im, axis=0) , [jm, im])
if not silent:
print 'total area of field = ', np.sum(area.flat)
print 'total earth area = ', 4 * np.pi * radius ** 2
return area
#!/usr/bin/env python
# control.py
"""
Author : aki
Revision History:
File created on 16 Nov 2013.
"""
import logging
################### Begin Class MethaneDaSystem ###################
from da.baseclasses.dasystem import DaSystem
class MethaneDaSystem(DaSystem):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def validate(self):
"""
validate the contents of the rc-file given a dictionary of required keys
"""
needed_rc_items = ['obs.input.dir',
'obs.input.fname']
for k, v in self.iteritems():
if v == 'True' :
self[k] = True
if v == 'False':
self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
logging.warning('Missing a required value in rc-file : %s' % key)
logging.debug('DA System Info settings have been validated succesfully')
################### End Class MethaneDaSystem ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# da_initexit.py
"""
modified for module initext
Revision History:
File created on Apr. 2016 by Aki
File revised on Feb. 2017 by Aki
"""
import logging
import os
import sys
import numpy as np
from string import join
import da.tools.rc as rc
from da.tools.initexit import *
needed_da_items = [
'time.start',
'time.finish',
'time.nlag',
'time.cycle',
'dir.da_run',
'da.system',
'da.system.rc',
'da.obsoperator',
'da.obsoperator.rc',
'da.optimizer.nmembers',
'da.suffixname']
#def validate_rc2(self):
# """
# Validate the contents of the rc-file given a dictionary of required keys.
# Currently required keys are :attr:`~da.tools.initexit.needed_da_items`
# """
#
# for k, v in self.iteritems():
# if v in ['True', 'true', 't', 'T', 'y', 'yes']:
# self[k] = True
# if v in ['False', 'false', 'f', 'F', 'n', 'no']:
# self[k] = False
# if 'date' in k :
# self[k] = to_datetime(v)
# if k in ['time.start', 'time.end', 'time.finish', 'da.restart.tstamp']:
# self[k] = to_datetime(v)
# for key in needed_da_items:
# if not self.has_key(key):
# msg = 'Missing a required value in rc-file : %s' % key
# logging.error(msg)
# logging.error('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ')
# logging.error('Please note the update on Dec 02 2011 where rc-file names for DaSystem and ')
# logging.error('are from now on specified in the main rc-file (see da/rc/da.rc for example)')
# logging.error('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ')
# raise IOError, msg
# logging.debug('DA Cycle settings have been validated succesfully')
#CycleControl.validate_rc = validate_rc2
def submit_next_cycle_fmi(self):
"""
Submit the next job of a DA cycle, this consists of
* Changing to the working directory from which the job was started initially
* create a line to start the master script again with a newly created rc-file
* Submitting the jobfile
If the end of the cycle series is reached, no new job is submitted.
"""
if self['time.end'] < self['time.finish']:
# file ID and names
jobid = self['time.end'].strftime('%Y%m%d')
targetdir = os.path.join(self['dir.exec'])
jobfile = os.path.join(targetdir, 'jb.%s.jb' % jobid)
logfile = os.path.join(targetdir, 'jb.%s.log' % jobid)
# Template and commands for job
jobparams = {'jobname':"ctdas", 'jobtime':'05:00:00', 'logfile': logfile, 'errfile': logfile,
'jobpes':'120','jobnodes':'20'}
template = self.daplatform.get_job_template(jobparams)
execcommand = os.path.join(self['dir.da_submit'], sys.argv[0])
if '-t' in self.opts:
(self.opts).remove('-t')
#template += 'python %s rc=%s %s' % (execcommand, self['da.restart.fname'], join(self.opts, ''))
template += 'cd %s \n' %self['dir.da_submit']
template += '%s rc=%s %s' % (sys.argv[0], self['da.restart.fname'], join(self.opts, ''))
# write and submit
self.daplatform.write_job(jobfile, template, jobid)
jobid = self.daplatform.submit_job(jobfile, joblog=logfile)
job_file = 'ctdas.o%s' %jobid[0:-4]
logging.info('Jobfile %s' %job_file )
logging.info('DA cycle has been submitted.')
else:
logging.info('Final date reached, no new cycle started')
def setup_file_structure_fmi(self):
# Aki: name of file structure changed
# At FMI, job files are stored in executed folders, and cannot define where to put them.
# So job file folder is not created.
filtertime = self['time.start'].strftime('%Y%m%d')
suf = self['da.suffixname']
self['dir.exec'] = os.path.join(self['dir.da_run'], 'exec')
self['dir.input'] = os.path.join(self['dir.da_run'], 'input_%s' %suf )
self['dir.output'] = os.path.join(self['dir.da_run'], 'output_%s' %suf )
self['dir.analysis'] = os.path.join(self['dir.da_run'], 'analysis_%s' %suf )
#self['dir.jobs'] = os.path.join(self['dir.da_run'], 'jobs')
self['dir.restart'] = os.path.join(self['dir.da_run'], 'restart_%s' %suf )
logging.info("setup_file_structure %s" %self['dir.output'])
create_dirs(self['dir.da_run'])
create_dirs(os.path.join(self['dir.exec']))
create_dirs(os.path.join(self['dir.input']))
create_dirs(os.path.join(self['dir.output']))
create_dirs(os.path.join(self['dir.analysis']))
#create_dirs(os.path.join(self['dir.jobs']))
create_dirs(os.path.join(self['dir.restart']))
logging.info('Succesfully created the file structure for the assimilation job')
def setup_fmi(self):
if self['transition']:
logging.info("Transition of filter from previous step with od meteo from 25 to 34 levels")
self.setup_file_structure()
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.read_random_seed(False)
elif self['time.restart']:
logging.info("Restarting filter from previous step")
self.setup_file_structure()
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
self.read_random_seed(False)
else: #assume that it is a fresh start, change this condition to more specific if crash recover added
logging.info("First time step in filter sequence")
self.setup_file_structure()
# expand jobrcfilename to include exec dir from now on.
# First strip current leading path from filename
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
if self.dasystem.has_key('copyregionsfile'):
shutil.copy(os.path.join(self.dasystem['regionsfile']),os.path.join(self['dir.exec'],'da','methane','analysis','copied_regions.nc'))
logging.info('Copied regions file to the analysis directory: %s'%os.path.join(self.dasystem['regionsfile']))
if self.dasystem.has_key('extendedregionsfile'):
shutil.copy(os.path.join(self.dasystem['extendedregionsfile']),os.path.join(self['dir.exec'],'da','analysis','copied_regions_extended.nc'))
logging.info('Copied extended regions file to the analysis directory: %s'%os.path.join(self.dasystem['extendedregionsfile']))
for filename in glob.glob(os.path.join(self['dir.exec'],'da','analysis','*.pickle')):
logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1])
os.remove(filename)
for filename in glob.glob(os.path.join(self['dir.exec'],'*.pickle')):
logging.info('Deleting pickle file %s to make sure the correct regions are used'%os.path.split(filename)[1])
os.remove(filename)
if self.dasystem.has_key('random.seed.init'):
self.read_random_seed(True)
self.parse_times()
#self.write_rc(self['jobrcfilename'])
CycleControl.submit_next_cycle = submit_next_cycle_fmi
CycleControl.setup_file_structure = setup_file_structure_fmi
CycleControl.setup = setup_fmi
if __name__ == "__main__":
pass
#!/usr/bin/env python
# io.py
"""
Author : aki
Revision History:
File created on Apr 2016.
"""
import standardvariables
import datetime as dt
from numpy import array, arange
import os
import logging
import sys
sys.path.append('/stornext/store/carbon/CarbonTracker/netcdf4/python_packages_install_dir/lib/python2.7/site-packages/')
import netCDF4
sys.path.append('/stornext/store/carbon/CarbonTracker/pyhdf/pyhdf-0.8.3/lib/python2.7/site-packages/')
import pyhdf.SD as hdf
sys.path.append('../')
from da.tools.io4 import *
disclaimer = "This data belongs to the CarbonTracker project"
email = "aki.tsuruta@fmi.fi"
url = "http://en.ilmatieteenlaitos.fi/carbon-cycle-modelling"
institution = "Finnish Meteorological Institute, Climate research"
source = "CarbonTracker release 1.0"
conventions = "CF-1.1"
historytext = 'created on '+dt.datetime.now().strftime('%B %d, %Y')+' by %s'%os.environ['USER']
def add_tc_header2(self):
#
self.setncattr('Institution',institution)
self.setncattr('Contact',email)
self.setncattr('URL',url)
self.setncattr('Source',source)
self.setncattr('Convention',conventions)
self.setncattr('Disclaimer',disclaimer)
self.setncattr('History',historytext)
def standard_var2(self,varname):
""" return properties of standard variables """
import standardvariables
if varname in standardvariables.standard_variables.keys():
return standardvariables.standard_variables[varname]
else:
return standardvariables.standard_variables['unknown']
CT_CDF.add_tc_header = add_tc_header2
CT_CDF.standard_var = standard_var2
CT_HDF.standard_var = standard_var2
#!/usr/bin/env python
# obs.py
"""
Author : aki
Revision History:
File revised on Feb 2017.
"""
import os
import sys
import logging
#from da.baseclasses.statevector import filename
import datetime as dtm
from string import strip
from numpy import array, logical_and
sys.path.append(os.getcwd())
sys.path.append('../../')
identifier = 'CarbonTracker CH4 mole fractions'
version = '0.0'
from da.baseclasses.obs import Observations
import da.methane.io4 as io
import da.tools.rc as rc
################### Begin Class CH4Observations ###################
class MethaneObservations(Observations):
""" an object that holds data + methods and attributes needed to manipulate mole fraction values """
def setup(self, dacycle):
self.startdate = dacycle['time.sample.start']
self.enddate = dacycle['time.sample.end']
sfname = dacycle.dasystem['obs.input.fname']
if sfname.endswith('.nc'):
filename = os.path.join(dacycle.dasystem['obs.input.dir'], sfname)
else:
filename = os.path.join(dacycle.dasystem['obs.input.dir'], sfname + '.' + self.startdate.strftime('%Y%m%d') + '.nc')
if not os.path.exists(filename):
msg = 'Could not find the required observation input file (%s) ' % filename
logging.error(msg)
raise IOError, msg
else:
self.obs_filename = filename
self.datalist = []
def add_observations(self):
""" Returns a MoleFractionList holding individual MoleFractionSample objects for all obs in a file
The CarbonTracker mole fraction files are provided as one long list of obs for all possible dates. So we can
either:
(1) read all, and the subselect the data we will use in the rest of this cycle
(2) Use nco to make a subset of the data
For now, we will stick with option (1)
"""
ncf = io.ct_read(self.obs_filename, 'read')
idates = ncf.get_variable('date_components')
dates = array([dtm.datetime(*d) for d in idates])
subselect = logical_and(dates >= self.startdate, dates <= self.enddate).nonzero()[0]
dates = dates.take(subselect, axis=0)
ids = ncf.get_variable('obs_num').take(subselect, axis=0)
#evn = ncf.get_variable('eventnumber').take(subselect, axis=0)
#evn = [s.tostring().lower() for s in evn]
#evn = map(strip, evn)
sites = ncf.get_variable('obs_id').take(subselect, axis=0)
sites = [s.tostring().lower() for s in sites]
sites = map(strip, sites)
lats = ncf.get_variable('latitude').take(subselect, axis=0)
lons = ncf.get_variable('longitude').take(subselect, axis=0)
alts = ncf.get_variable('altitude').take(subselect, axis=0)
obs = ncf.get_variable('obs').take(subselect, axis=0) * 1.e-9
logging.info("Converting observed values from ppb to mol/mol!!!!")
#species = ncf.get_variable('species').take(subselect, axis=0)
#species = [s.tostring().lower() for s in species]
#species = map(strip, species)
#strategy = ncf.get_variable('sampling_strategy').take(subselect, axis=0)
#flags = ncf.get_variable('NOAA_QC_flags').take(subselect, axis=0)
#flags = [s.tostring().lower() for s in flags]
#flags = map(strip, flags)
#flags = [int(f == '...') for f in flags]
ncf.close()
logging.debug("Successfully read data from obs file (%s)" % self.obs_filename)
for n in range(len(dates)):
obs[n] = obs[n]
#self.datalist.append(MoleFractionSample(ids[n], dates[n], sites[n], obs[n], 0.0, 0.0, 0.0, 0.0, flags[n], alts[n], lats[n], lons[n], evn[n], species[n], strategy[n], 0.0, self.obs_filename))
self.datalist.append( MoleFractionSample(ids[n], dates[n], sites[n], obs[n], 0.0, 0.0, 0.0, 0.0, 0.0, alts[n], lats[n], lons[n], '0000', 'ch4', 1.0, 0.0, self.obs_filename) )
logging.debug("Added %d observations to the Data list" % len(dates))
def add_simulations(self, filename, silent=True):
""" Adds model simulated values to the mole fraction objects """
if not os.path.exists(filename):
msg = "Sample output filename for observations could not be found : %s" % filename
logging.error(msg)
logging.error("Did the sampling step succeed?")
logging.error("...exiting")
raise IOError, msg
ncf = io.ct_read(filename, method='read')
ids = ncf.get_variable('obs_num')
simulated = ncf.get_variable('flask')
#for i in xrange(simulated.shape[0]):
# print simulated[i,:]
ncf.close()
logging.info("Successfully read data from model sample file (%s)" % filename)
obs_ids = self.getvalues('id')
obs_ids = obs_ids.tolist()
ids = map(int, ids)
missing_samples = []
for idx, val in zip(ids, simulated):
if idx in obs_ids:
index = obs_ids.index(idx)
#print id,val,val.shape
self.datalist[index].simulated = val
else:
missing_samples.append(idx)
if not silent and missing_samples != []:
logging.warning('Model samples were found that did not match any ID in the observation list. Skipping them...')
#msg = '%s'%missing_samples ; logging.warning(msg)
logging.info("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples)))
def write_sample_coords(self, obsinputfile):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
dimid = f.add_dim('obs', len(self.datalist))
dim200char = f.add_dim('string_of200chars', 200)
dimcalcomp = f.add_dim('calendar_components', 6)
if len(self.datalist) == 0:
f.close()
#return obsinputfile
data = self.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
f.add_data(savedict)
data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid + dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
f.add_data(savedict)
data = self.getvalues('lat')
savedict = io.std_savedict.copy()
savedict['name'] = "latitude"
savedict['units'] = "degrees_north"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('lon')
savedict = io.std_savedict.copy()
savedict['name'] = "longitude"
savedict['units'] = "degrees_east"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('height')
savedict = io.std_savedict.copy()
savedict['name'] = "altitude"
savedict['units'] = "meters_above_sea_level"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('samplingstrategy')
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "sampling_strategy"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
f.add_data(savedict)
data = self.getvalues('evn')
savedict = io.std_savedict.copy()
savedict['dtype'] = "char"
savedict['name'] = "obs_id"
savedict['units'] = "NOAA database identifier"
savedict['dims'] = dimid + dim200char
savedict['values'] = data
savedict['missing_value'] = '!'
f.add_data(savedict)
f.close()
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
def add_model_data_mismatch(self, filename):
"""
Get the model-data mismatch values for this cycle.
(1) Open a sites_weights file
(2) Parse the data
(3) Compare site list against data