observationoperator.py 16.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. 
Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
updates of the code. See also: http://www.carbontracker.eu. 

This program is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software Foundation, 
version 3. This program is distributed in the hope that it will be useful, but 
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

You should have received a copy of the GNU General Public License along with this 
program. If not, see <http://www.gnu.org/licenses/>."""
weihe's avatar
weihe committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# stilt_tools.py

"""
Author : W. He

Revision History:
Basing on Wouter's codes, replace the TM5 model with the STILT model, April 2015.

This module holds specific functions needed to use the STILT model within the data assimilation shell. It uses the information
from the DA system in combination with the generic stilt.rc files.

The STILT model is now controlled by a python subprocess. This subprocess consists of an MPI wrapper (written in C) that spawns
a large number ( N= nmembers) of STILT model instances under mpirun, and waits for them all to finish.

#The design of the system assumes that the stilt function are executed using R codes, and is residing in a
#directory specified by the ${RUNDIR} of a stilt rc-file. This stilt rc-file name is taken from the data assimilation rc-file. Thus,

"""
weihe's avatar
weihe committed
32
import shutil
weihe's avatar
weihe committed
33
34
35
36
37
38
import os
import sys
import logging
import shutil
import datetime
import subprocess    #use to get info from running status
39
import warnings
weihe's avatar
weihe committed
40
import numpy as np
41
42
import netCDF4 as nc
#from string import join
weihe's avatar
weihe committed
43
import glob
44
45
sys.path.append(os.getcwd())
sys.path.append("../../")
46
47
#import da.tools.rc as rc
#from da.tools.general import create_dirs, to_datetime
48
49
50
from da.baseclasses.observationoperator import ObservationOperator
import da.tools.rc as rc
import da.tools.io4 as io
weihe's avatar
weihe committed
51
52
53
54
55
56
57
58
59
60
61
# global constants, which will be used in the following classes
identifier = 'WRF-STILT'
version = '1.0'
#mpi_shell_filename = 'STILT_mpi_wrapper'   #for STILT, not use MPI
#mpi_shell_location = 'da/bin/'


################### Begin Class STILT ###################



62
class STILTObservationOperator(ObservationOperator):
weihe's avatar
weihe committed
63

64
    def __init__(self, dacycle=None):   #only the filename used to specify the location of the stavector file for wrf-stilt runs
weihe's avatar
weihe committed
65
66
67
68
69
70
71
72
73
        """ The instance of an STILTObservationOperator is application dependent """
        self.ID = identifier    # the identifier gives the model name
        self.version = version       # the model version used
        self.restart_filelist = []
        self.output_filelist = []
        self.outputdir = None # Needed for opening the samples.nc files created

        logging.info('Observation Operator initialized: %s (%s)' % (self.ID, self.version))

74
75
76
77
78
79
80
        if dacycle != None:
            self.dacycle = dacycle
        else:
            self.dacycle = {}

        self.startdate=None

weihe's avatar
weihe committed
81
    def setup(self, dacycle):
Woude, Auke van der's avatar
Woude, Auke van der committed
82
83
        """ Execute all steps needed to prepare the ObsOperator for use inside CTDAS, only done at the very first cycle normally
        Input: dacycle: dict with information on the data assimilatin cycle """
weihe's avatar
weihe committed
84
85
        self.dacycle = dacycle
        self.outputdir = dacycle['dir.output']
86
        self.rc = dacycle['da.obsoperator.rc']
87
        # Read the stilt.rc file for settings to the observationoperator 
88
        rc_options = rc.read(self.rc)
89
        # Also read in the tracer information
90
91
92
93
94
95
96
        rc_options['tracers'] = rc_options['tracers'].split(',')
        self.rc_options = rc_options
        self.tracer_info = {tracer: None for tracer in rc_options['tracers']}
        for tracer in self.tracer_info.keys():
            self.tracer_info[tracer] = rc.read(self.rc_options['tracerdir']+'/'+tracer)
        
    def parse_samplefile(self):
97
        """Function that reads the sample files created in obspack.py"""
98
99
        infile = nc.Dataset(self.dacycle['ObsOperator.inputfile'])
        with warnings.catch_warnings():
100
101
            warnings.simplefilter("ignore")
            # Add the observation ids, tracers and sites
102
103
104
105
106
107
108
109
            obs_ids = infile['obs_id'][:]
            obs_ids = [b''.join(obs_id).decode() for obs_id in obs_ids]
            self.tracer, self.site, self.obs_id = [], [], []
            for obs_id in obs_ids:
                tracer, site, *_ = obs_id.split('~')[1].split('_')
                self.tracer.append(tracer)
                self.site.append(site)
                self.obs_id.append(obs_id.split('~')[-1])
110
            # Times and latitude and longitudes are static; only need once
111
112
113
114
            self.times = infile['date_components'][:]
            
            self.lat = infile['latitude'][:]
            self.lon = infile['longitude'][:]
115
        # Set the start time
116
117
118
119
120
121
122
        starttime = self.dacycle['time.sample.start']
        self.starttime = starttime
        self.year = starttime.year
        self.month= starttime.month
        self.day  = starttime.day
        self.hour = starttime.hour
    
weihe's avatar
weihe committed
123
    def prepare_run(self,proc):
124
125
126
        """ Prepare the running of the actual forecast model.
        - Initialise the file that will hold the simulated values
        - Initialise the number of ensemble members
Woude, Auke van der's avatar
Woude, Auke van der committed
127
128
129
        - Update the rc file (stilt_X.rc)
        Input: 
            proc: int: number of process"""
weihe's avatar
weihe committed
130
131
132
        import os

        # Define the name of the file that will contain the modeled output of each observation
weihe's avatar
weihe committed
133
        self.simulated_file = os.path.join(self.outputdir, 'samples_simulated.%s-%s.nc' % (self.dacycle['time.sample.stamp'],proc))
weihe's avatar
weihe committed
134
        self.forecast_nmembers = int(self.dacycle['da.optimizer.nmembers'])
weihe's avatar
weihe committed
135
        self.update_rc(self.rc,proc)
weihe's avatar
weihe committed
136

weihe's avatar
weihe committed
137
    def update_rc(self,name,proc):
138
        """Function that updates the .rc files.
Woude, Auke van der's avatar
Woude, Auke van der committed
139
140
141
142
        The outputdir and time of the new rc file are adjusted
        Input: 
            name: str: rc filename
            proc: number of process"""
weihe's avatar
weihe committed
143
144
145
146
        if 'stilt.rc' in name:
            path,dummy= name.split('stilt.rc')
            shutil.copyfile(name,path+'stilt_%s.rc'%proc)
            name=os.path.join(path,'stilt_%s.rc'%proc)
weihe's avatar
weihe committed
147
148
        self.rc_filename = name

149
        new_rc = rc.read(name)
150
        # update the time and the outputdir
151
152
153
        new_rc['cycstadate'] = self.starttime
        new_rc['outdir'] = self.rc_options['outdir']
        rc.write(name, new_rc)
weihe's avatar
weihe committed
154
155
156

        logging.debug('STILT rc-file updated successfully')

157
    
158
159
160
    def get_time_index_nc(self, time=None):
        """Function that gets the time index from the flux files
        based on the cycletime and the first time in all the files (hardcoded in stilt.rc)
Woude, Auke van der's avatar
Woude, Auke van der committed
161
162
        Input:
            time: datetime.datetime: The time for which the index needs to be found. Default: current time cycle datetime
163
164
165
        """
        if time == None:
            # Get the time of the current cycle
Woude, Auke van der's avatar
Woude, Auke van der committed
166
            time = self.dacycle['time.start']
167
        # Get the start date of all cycles
168
169
        startdate = self.rc_options['files_startdate']
        startdate = datetime.datetime.strptime(startdate, '%Y-%m-%d %H:%M:%S')
170
171
        # Get the difference between the current and the start
        # Note that this is in hours, and thus assumes that the flux files are hourly as well
Woude, Auke van der's avatar
Woude, Auke van der committed
172
        timediff = time - startdate
173
174
175
176
177
        timediff_hours = int(timediff.total_seconds()/3600)
        time_index = int(timediff_hours)
        return time_index
    
    def get_time_indices(self):
178
179
180
        """Function that gets the time indices in the flux files
        Because if the footprint is for 24 hours back, we need the fluxes 24 hours back"""

181
182
183
184
185
186
        time_index = self.get_time_index_nc()
        
        return slice(time_index-self.numtimes, time_index)

    
    def get_latlon_index_nc(self, ncfile):
187
        """Function that gets the indices of a lat/lon point in a .nc file.
Woude, Auke van der's avatar
Woude, Auke van der committed
188
189
190
        This can be used for e.g. the background concentrations
        Input: 
            ncfile: netCDF4.Dataset with latitude and longitude dimensions"""
191
        # Initialise some names that could indicate latitude. Expand at will
192
        lat_opts = ['lat', 'latitude', 'Lat', 'LAT', 'LATITUDE', 'Latitude']
193
        # Same for longitude
194
195
        lon_opts = ['lon', 'longigude', 'Lon', 'LON', 'LONGITUDE', 'Longitude']
        
196
        # Get the index via the minimum of the difference.
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
        for latopt in lat_opts:
            try:
                lat_ind = np.argmin(abs(ncfile[latopt][:] - self.lat))
            except:
                pass
        
        for lonopt in lon_opts:
            try:
                lon_ind = np.argmin(abs(ncfile[lonopt][:] - self.lon))
            except:
                pass
        
        return lat_ind, lon_ind
    
    def get_foot(self, site):
212
        """Function that gets the footprint for the current time and site.
Woude, Auke van der's avatar
Woude, Auke van der committed
213
214
215
        Returns a 3D np.array with dims (time, lat, lon)
        Input:
            site: str of 3 letter sitename. Is converted to uppercase. Example: 'hei'"""
216
217
218
219
220
221
222
223
224
225
226
        path = self.rc_options['footdir'] + '/' + site.upper() + '24'
        fname = path + '/footprint_{0}_{1}x{2:02d}x{3:02d}x{4:02d}*.nc'.format(site.upper(),
                                                              self.year, self.month, self.day, self.hour)
        f = glob.glob(fname)[0]
        # Flip, because the times are negative and lats are read in upside down by python
        # flip with axis 1 are the lats
        # flipud are the times.
        footprint = nc.Dataset(f)['foot']
        self.numtimes = len(footprint)
        return np.flip(np.flipud(footprint), axis=1)
    
227
    def get_center_of_mass(fp):
Woude, Auke van der's avatar
Woude, Auke van der committed
228
229
230
231
        """Function that finds the center of mass of the first footprint and the time corresponding to it.
        Input: 
            fp: 3d np.array of the footprint
        time is taken from the observationoperator self."""
232
        from scipy import ndimage
233
234
235
236
237
        i = -1
        total_influence = 0
        while total_influence < 0.0000001:
            i+=1
            total_influence = fp[i].sum()
238
        
239
240
241
242
        center_of_mass = ndimage.measurements.center_of_mass(fp[i])
        center_of_mass = np.array(np.rint(center_of_mass), dtype=int)
        # Note that we could also calculate the index in the file with this i and the 'get_time_indices' func. 
        return center_of_mass, i
243
244
    
    def get_background(self, tracer_info):
Woude, Auke van der's avatar
Woude, Auke van der committed
245
246
        """Function that finds the background concentration, based on the tracer info
        Input: tracer info: Dict with infromation on the background concentration"""
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
#         data = nc.Dataset(file, 'r+')
#         # Set the data in 'days since 2016-1-1 00:00:00', similar to the footprints.
#         # Note that this should, preferably, not be hardcoded.
#         data['time'].units = 'days since 2016-1-1 00:00:00'
#         # Get the indices. This is also where the data['time'] units come in play
#         indices = self.get_time_indices(data['time'])
#         
#         # Now comes the tricky part:
#             # the atm concentration is increased wrt what?
#             # So, which time indices, and lat/lon indices should we take?
#             # Or, should we take the average of the centerpoint-time_index concentrations?
#             # Or, could we, much easier, just take the background concentration at the station?
#             
#         # For now, just return 0
        logging.warning('No background concentration specified, using 0')
        return 0

    def get_atm_increase(self, site, tracer_info):
Woude, Auke van der's avatar
Woude, Auke van der committed
265
266
267
268
269
270
271
        """Function that calculates the atmospheric increase due to the exchange of fluxes over the footprint
        Input: 
            site: str of 3 chars with the location. Example: 'hei'
            tracer_info: dict with information on the tracer:
                path to fluxes,
                half-life,
                recalculation factor"""
272
        # First, get the footprint
273
        foot = self.get_foot(site)
274
        # Get the tracer fluxes
275
276
277
        file = tracer_info['path']
        data = nc.Dataset(file, 'r')
        fluxmap = data[tracer_info['fluxname']]
278
279
280
281
        # Get the time indices for the tracer fluxes
        # It is assumed that the files all start at the same time
        # And therefore this is not tracer-dependent
        indices = self.get_time_indices()
282
283
        fluxes = fluxmap[indices]
        
284
        # Multiply the flux with the footprint. Take the half-life into account
285
286
287
288
289
290
291
292
293
294
295
296
297
        half_life = eval(tracer_info['half_life'])
        if half_life:
            atm_increase = 0 # Initialise the atmospheric increase
            # We have to loop, to take into account the fraction of decayed tracer
            for i, (footprint, flux) in enumerate(zip(foot, fluxes)):
                decayed_loss = np.exp(-half_life * (i+0.5))
                atm_increase += (footprint*flux).sum() * decayed_loss
        else:
            atm_increase = (fluxes* foot).sum()
        
        return atm_increase
    
    def get_concentration(self, site,  tracer_info):
298
        """Function that calculates the simulated concentration for a given site and tracer  as
Woude, Auke van der's avatar
Woude, Auke van der committed
299
300
301
302
303
304
305
        [flux * foot] + background
        Input:
            site: str of 3 chars with the location. Example: 'hei'
            tracer_info: dict with information on the tracer:
                path to fluxes,
                half-life,
                recalculation factor"""
306
307
308
        return self.get_atm_increase(site, tracer_info) + self.get_background(tracer_info)

    def run_forecast_model(self,proc,out_q):
309
        """Function that runs the forecast model parallel
Woude, Auke van der's avatar
Woude, Auke van der committed
310
311
312
313
        First prepares run, then runs
        Input: 
            proc: int of the current process
            out_q: instance of multiprocessing.Queue()"""
314
        self.parse_samplefile()
weihe's avatar
weihe committed
315
316
317
318
319
        self.prepare_run(proc)
        self.run(proc)
        outdict = {}
        outdict[proc] = self.simulated_file
        out_q.put(outdict)
weihe's avatar
weihe committed
320
321


322
    def run(self,proc):
323
324
        """Function that calculates the concentration for each site and tracer.
        Calculates the concentration based on 'get_concentration' and then saves the data
Woude, Auke van der's avatar
Woude, Auke van der committed
325
326
327
328
329
330
331
332
333
        Input:
            proc: int of process"""
        for mem in range(self.dacycle['da.optimizer.nmembers']):
            concentrations_member_i = []
            for tracer, site, id in zip(self.tracer, self.site, self.obs_id):
                logging.info('making concentration for tracer {}, site {} and obs_id {}'.format(tracer, site, id))
                conc = self.get_concentration(site, self.tracer_info[tracer])
                concentrations_member_i.append(conc)
            concentrations.append(concentrations_member_i)
334
335
        self.concentrations = concentrations
        self.save_data()
weihe's avatar
weihe committed
336
337

    def save_data(self):
338
339
340
341
342
343
344
         """Function that saves the simulated concentrations and metadata to a nc file"""
         # First, create the file 
         f = io.CT_CDF(self.simulated_file, method='create')
         logging.debug('Creating new simulated observation file in ObservationOperator (%s)' % self.simulated_file)
         # Add a dimension
         dimid = f.add_dim('obs_num', len(self.concentrations))
         dim_site = f.add_dim('charstring', 10)
345
346
         
        # Add the observation numbers
347
348
349
350
351
352
353
354
355
356
357
         savedict = io.std_savedict.copy()
         savedict['name'] = "obs_num"
         savedict['dtype'] = "int"
         savedict['long_name'] = "Unique_Dataset_observation_index_number"
         savedict['units'] = ""
         savedict['dims'] = dimid
         savedict['values'] = self.obs_id
         savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
         f.add_data(savedict)

         # Add the simulated concentrations
Woude, Auke van der's avatar
Woude, Auke van der committed
358
         dimmember = f.createDimension('nmembers', size=self.dacycle['da.optimizer.nmembers'])
359
360
361
362
363
364
         savedict = io.std_savedict.copy()
         savedict['name'] = "simulated"
         savedict['dtype'] = "float"
         savedict['long_name'] = "Simulated_concentration"
         savedict['units'] = "mol mol-1"
         savedict['values'] = self.concentrations
Woude, Auke van der's avatar
Woude, Auke van der committed
365
         savedict['dims'] = dimid + dimmember
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
         savedict['comment'] = "Simulated value by STILTObservationOperator"
         f.add_data(savedict)

         # Add the site
         savedict = io.std_savedict.copy()
         savedict['name'] = "site"
         savedict['dtype'] = "char"
         savedict['long_name'] = "Site_of_simulation"
         savedict['units'] = ""
         savedict['values'] = self.site
         savedict['dims'] = dimid + dim_site
         savedict['comment'] = "The site for which the simulation is run"
         f.add_data(savedict)

         # Add the tracer
         savedict = io.std_savedict.copy()
         savedict['name'] = "tracer"
         savedict['dtype'] = "char"
         savedict['long_name'] = "tracer"
         savedict['units'] = ""
         savedict['values'] = self.tracer
         savedict['dims'] = dimid + dim_site
         savedict['comment'] = "Tracer"
         f.add_data(savedict)
weihe's avatar
weihe committed
390
391
392
393
394

if __name__ == "__main__":
    pass