Skip to content
Snippets Groups Projects
Forked from CTDAS / CTDAS
97 commits behind the upstream repository.
expand_molefractions.py 14.63 KiB
"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. 
Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
updates of the code. See also: http://www.carbontracker.eu. 

This program is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software Foundation, 
version 3. This program is distributed in the hope that it will be useful, but 
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

You should have received a copy of the GNU General Public License along with this 
program. If not, see <http://www.gnu.org/licenses/>."""
#!/usr/bin/env python
# expand_fluxes.py
import sys
import os
import getopt
import shutil
import logging
import netCDF4
import numpy as np
from string import join
from datetime import datetime, timedelta
sys.path.append('../../')
from da.tools.general import date2num, num2date
from da.tools.general import create_dirs
import da.tools.io4 as io


"""
Author: Wouter Peters (Wouter.Peters@wur.nl)

Revision History:
File created on 11 May 2012.

"""

def write_mole_fractions(dacycle):
    """ 
    
    Write Sample information to NetCDF files. These files are organized by site and 
    have an unlimited time axis to which data is appended each cycle.

    The needed information is obtained from the sample_auxiliary.nc files and the original input data files from ObsPack. 

    The steps are:

    (1) Create a directory to hold timeseries output files
    (2) Read the sample_auxiliary.nc file for this cycle and get a list of original files they were obtained from
    (3) For each file, copy the original data file from ObsPack (if not yet present)
    (4) Open the copied file, find the index of each observation, fill in the simulated data
    
    """

    dirname = create_dirs(os.path.join(dacycle['dir.analysis'], 'data_molefractions'))
    #
    # Some help variables
    #
    dt = dacycle['cyclelength']
    startdate = dacycle['time.start'] 
    enddate = dacycle['time.end'] 

    logging.debug("DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M'))
    logging.debug("DA Cycle end   date is %s" % enddate.strftime('%Y-%m-%d %H:%M'))

    dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"),)

    # Step (1): Get the posterior sample output data file for this cycle

    infile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])

    ncf_in = io.ct_read(infile, 'read')

    obs_num = ncf_in.get_variable('obs_num')
    obs_val = ncf_in.get_variable('observed')
    simulated = ncf_in.get_variable('modelsamples')
    infilename = ncf_in.get_variable('inputfilename')
    infiles1 = netCDF4.chartostring(infilename).tolist()
    # In case of reanalysis on different platform, obspack-input-directory might have a different name. 
    # This is checked here, and the filenames are corrected
    dir_from_rc = dacycle.dasystem['obspack.input.dir']
    dir_from_output = infiles1[0]
    d1 = dir_from_rc[:dir_from_rc.find('obspacks')]
    d2 = dir_from_output[:dir_from_output.find('obspacks')]
    if d1 == d2:
        infiles = infiles1
    else:
        infiles = []
        for ff in infiles1:
            infiles.append(ff.replace(d2,d1))


    ncf_in.close()

    # Step (2): Get the prior sample output data file for this cycle

    infile = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % startdate.strftime('%Y%m%d'))

    if os.path.exists(infile): 
        optimized_present = True
    else:
        optimized_present = False

    if optimized_present:

        ncf_fc_in = io.ct_read(infile, 'read')

        fc_obs_num = ncf_fc_in.get_variable('obspack_num')
        fc_obs_val = ncf_fc_in.get_variable('observed')
        fc_simulated = ncf_fc_in.get_variable('modelsamplesmean_prior')
        fc_simulated_ens = ncf_fc_in.get_variable('modelsamplesdeviations_prior')
        fc_flag      = ncf_fc_in.get_variable('flag')
        if 'modeldatamismatchvariance' not in dacycle.dasystem:
            fc_r         = ncf_fc_in.get_variable('modeldatamismatchvariance')
            fc_hphtr     = ncf_fc_in.get_variable('totalmolefractionvariance')
        elif dacycle.dasystem['opt.algorithm'] == 'serial':
            fc_r         = ncf_fc_in.get_variable('modeldatamismatchvariance')
            fc_hphtr     = ncf_fc_in.get_variable('totalmolefractionvariance')
        elif dacycle.dasystem['opt.algorithm'] == 'bulk':
            fc_r         = ncf_fc_in.get_variable('modeldatamismatchvariance').diagonal()
            fc_hphtr     = ncf_fc_in.get_variable('totalmolefractionvariance').diagonal()
        filesitecode = ncf_fc_in.get_variable('sitecode')

        fc_sitecodes = netCDF4.chartostring(filesitecode).tolist()

        ncf_fc_in.close()

        # Expand the list of input files with those available from the forecast list

        infiles_rootdir = os.path.split(infiles[0])[0]
        infiles.extend(os.path.join(infiles_rootdir, f + '.nc') for f in fc_sitecodes)


    #Step (2): For each observation timeseries we now have data for, open it and fill with data

    for orig_file in set(infiles):

        if not os.path.exists(orig_file):
            logging.error("The original input file (%s) could not be found, continuing to next file..." % orig_file)
            continue


        copy_file = os.path.join(dirname, os.path.split(orig_file)[-1])
        if not os.path.exists(copy_file):
            shutil.copy(orig_file, copy_file)
            logging.debug("Copied a new original file (%s) to the analysis directory" % orig_file)

            ncf_out = io.CT_CDF(copy_file, 'write')

            # Modify the attributes of the file to reflect added data from CTDAS properly

	    try:
	       host=os.environ['HOSTNAME']
	    except:
	       host='unknown'
     

            ncf_out.Caution = '==================================================================================='
            try:
                ncf_out.History += '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)
            except:
                ncf_out.History = '\nOriginal observation file modified by user %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)
            ncf_out.CTDAS_info = 'Simulated values added from a CTDAS run by %s on %s\n' % (os.environ['USER'], datetime.today().strftime('%F'),)\
                               + '\nCTDAS was run on platform %s' % (host,)\
                               + '\nCTDAS job directory was %s' % (dacycle['dir.da_run'],)\
                               + '\nCTDAS Da System was %s' % (dacycle['da.system'],)\
                               + '\nCTDAS Da ObsOperator was %s' % (dacycle['da.obsoperator'],)
            ncf_out.CTDAS_startdate = dacycle['time.start'].strftime('%F')
            ncf_out.CTDAS_enddate = dacycle['time.finish'].strftime("%F")
            ncf_out.original_file = orig_file


            # get nobs dimension

            if 'id' in ncf_out.dimensions:
                dimidob = ncf_out.dimensions['id']
                dimid = ('id',)
            elif 'obs' in ncf_out.dimensions:
                dimidob = ncf_out.dimensions['obs']
                dimid = ('obs',)

            if dimidob.isunlimited:
                nobs = ncf_out.inq_unlimlen()
            else:
                nobs = len(dimid)


            # add nmembers dimension

            dimmembersob = ncf_out.createDimension('nmembers', size=simulated.shape[1])
            dimmembers = ('nmembers',)
            nmembers = len(dimmembers)

            # Create empty arrays for posterior samples, as well as for forecast sample statistics

            savedict = io.std_savedict.copy()
            savedict['name'] = "flag_forecast"
            savedict['long_name'] = "flag_for_obs_model in forecast"
            savedict['units'] = "None"
            savedict['dims'] = dimid
            savedict['comment'] = 'Flag (0/1/2/99) for observation value, 0 means okay, 1 means QC error, 2 means rejected, 99 means not sampled'
            ncf_out.add_variable(savedict)

            savedict = io.std_savedict.copy()
            savedict['name'] = "modeldatamismatch"
            savedict['long_name'] = "modeldatamismatch"
            savedict['units'] = "[mol mol-1]^2"
            savedict['dims'] = dimid
            savedict['comment'] = 'Variance of mole fractions resulting from model-data mismatch'
            ncf_out.add_variable(savedict)

            savedict = io.std_savedict.copy()
            savedict['name'] = "totalmolefractionvariance_forecast"
            savedict['long_name'] = "totalmolefractionvariance of forecast"
            savedict['units'] = "[mol mol-1]^2"
            savedict['dims'] = dimid
            savedict['comment'] = 'Variance of mole fractions resulting from prior state and model-data mismatch'
            ncf_out.add_variable(savedict)

            savedict = io.std_savedict.copy()
            savedict['name'] = "modelsamplesmean"
            savedict['long_name'] = "mean modelsamples"
            savedict['units'] = "mol mol-1"
            savedict['dims'] = dimid
            savedict['comment'] = 'simulated mole fractions based on optimized state vector'
            ncf_out.add_variable(savedict)

            savedict = io.std_savedict.copy()
            savedict['name'] = "modelsamplesmean_forecast"
            savedict['long_name'] = "mean modelsamples from forecast"
            savedict['units'] = "mol mol-1"
            savedict['dims'] = dimid
            savedict['comment'] = 'simulated mole fractions based on prior state vector'
            ncf_out.add_variable(savedict)

            savedict = io.std_savedict.copy()
            savedict['name'] = "modelsamplesstandarddeviation"
            savedict['long_name'] = "standard deviaton of modelsamples over all ensemble members"
            savedict['units'] = "mol mol-1"
            savedict['dims'] = dimid
            savedict['comment'] = 'std dev of simulated mole fractions based on optimized state vector'
            ncf_out.add_variable(savedict)

            savedict = io.std_savedict.copy()
            savedict['name'] = "modelsamplesstandarddeviation_forecast"
            savedict['long_name'] = "standard deviaton of modelsamples from forecast over all ensemble members"
            savedict['units'] = "mol mol-1"
            savedict['dims'] = dimid
            savedict['comment'] = 'std dev of simulated mole fractions based on prior state vector'
            ncf_out.add_variable(savedict)

            savedict = io.std_savedict.copy()
            savedict['name'] = "modelsamplesensemble"
            savedict['long_name'] = "modelsamples over all ensemble members"
            savedict['units'] = "mol mol-1"
            savedict['dims'] = dimid + dimmembers
            savedict['comment'] = 'ensemble of simulated mole fractions based on optimized state vector'
            ncf_out.add_variable(savedict)

            savedict = io.std_savedict.copy()
            savedict['name'] = "modelsamplesensemble_forecast"
            savedict['long_name'] = "modelsamples from forecast over all ensemble members"
            savedict['units'] = "mol mol-1"
            savedict['dims'] = dimid + dimmembers
            savedict['comment'] = 'ensemble of simulated mole fractions based on prior state vector'
            ncf_out.add_variable(savedict)

        else:
            logging.debug("Modifying existing file (%s) in the analysis directory" % copy_file)

            ncf_out = io.CT_CDF(copy_file, 'write')


        # Get existing file obs_nums to determine match to local obs_nums

        if 'merge_num' in ncf_out.variables:
            file_obs_nums = ncf_out.get_variable('merge_num')
        elif 'obspack_num' in ncf_out.variables:
            file_obs_nums = ncf_out.get_variable('obspack_num')
        elif 'id' in ncf_out.variables:
            file_obs_nums = ncf_out.get_variable('id')

        # Get all obs_nums related to this file, determine their indices in the local arrays

        selected_obs_nums = [num for infile, num in zip(infiles, obs_num) if infile == orig_file]


        # Optimized data 1st: For each index, get the data and add to the file in the proper file index location

        for num in selected_obs_nums:

            model_index = np.where(obs_num == num)[0][0]
            file_index = np.where(file_obs_nums == num)[0][0]

            var = ncf_out.variables['modelsamplesmean']
            var[file_index] = simulated[model_index, 0]

            var = ncf_out.variables['modelsamplesstandarddeviation']
            var[file_index] = simulated[model_index, 1:].std()

            var = ncf_out.variables['modelsamplesensemble']
            var[file_index] = simulated[model_index, :]

        # Now forecast data too: For each index, get the data and add to the file in the proper file index location

        if optimized_present:

            selected_fc_obs_nums = [num for sitecode, num in zip(fc_sitecodes, fc_obs_num) if sitecode in orig_file]

            for num in selected_fc_obs_nums:

                model_index = np.where(fc_obs_num == num)[0][0]
                file_index = np.where(file_obs_nums == num)[0][0]

                var = ncf_out.variables['modeldatamismatch']
                var[file_index] = np.sqrt(fc_r[model_index])

                var = ncf_out.variables['modelsamplesmean_forecast']
                var[file_index] = fc_simulated[model_index]

                var = ncf_out.variables['modelsamplesstandarddeviation_forecast']
                var[file_index] = fc_simulated_ens[model_index, 1:].std()

                var = ncf_out.variables['modelsamplesensemble_forecast']
                var[file_index] = fc_simulated_ens[model_index, :]

                var = ncf_out.variables['totalmolefractionvariance_forecast']
                var[file_index] = fc_hphtr[model_index]

                var = ncf_out.variables['flag_forecast']
                var[file_index] = fc_flag[model_index]


        # close the file

        status = ncf_out.close()

    return None



if __name__ == "__main__":
    import logging
    from da.tools.initexit import CycleControl
    from da.carbondioxide.dasystem import CO2DaSystem

    sys.path.append('../../')

    logging.root.setLevel(logging.DEBUG)

    dacycle = CycleControl(args={'rc':'../../ctdas-od-gfed2-glb6x4-obspack-full.rc'})
    dasystem = CO2DaSystem('../rc/carbontracker_ct09_opfnew.rc')
    dacycle.dasystem = dasystem
    dacycle.setup()
    dacycle.parse_times()



    while dacycle['time.start'] < dacycle['time.finish']:

        outfile = write_mole_fractions(dacycle)

        dacycle.advance_cycle_times()

    sys.exit(0)