pipeline.py 20.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. 
Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
updates of the code. See also: http://www.carbontracker.eu. 

This program is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software Foundation, 
version 3. This program is distributed in the hope that it will be useful, but 
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

You should have received a copy of the GNU General Public License along with this 
program. If not, see <http://www.gnu.org/licenses/>."""
13
14
15
16
#!/usr/bin/env python
# pipeline.py

"""
Peters, Wouter's avatar
Peters, Wouter committed
17
18
.. module:: pipeline
.. moduleauthor:: Wouter Peters 
19
20

Revision History:
Peters, Wouter's avatar
Peters, Wouter committed
21
22
23
File created on 06 Sep 2010.

The pipeline module holds methods that execute consecutive tasks with each of the objects of the DA system. 
24
25
26
27
28
29

"""
import logging
import os
import sys
import datetime
karolina's avatar
karolina committed
30
import copy
31

karolina's avatar
karolina committed
32
33
header = '\n\n    ***************************************   '
footer = '    *************************************** \n  '
34

35
def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator, optimizer):
36
37
38
    """ The main point of entry for the pipeline """
    sys.path.append(os.getcwd())

karolina's avatar
karolina committed
39
    logging.info(header + "Initializing current cycle" + footer)
40
    start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
41

42
43
    prepare_state(dacycle, statevector)
    sample_state(dacycle, samples, statevector, obsoperator)
44
    
45
    invert(dacycle, statevector, optimizer)
karolina's avatar
karolina committed
46
    
47
    advance(dacycle, samples, statevector, obsoperator)
karolina's avatar
karolina committed
48
        
49
    save_and_submit(dacycle, statevector)    
karolina's avatar
karolina committed
50
    logging.info("Cycle finished...exiting pipeline")
51

52
def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator):
53
54
    """ The main point of entry for the pipeline """
    sys.path.append(os.getcwd())
55

56
    logging.info(header + "Initializing current cycle" + footer)
57
    start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)                   
58

brunner's avatar
brunner committed
59
    if 'forward.savestate.exceptsam' in dacycle:
60
        sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"])
61
62
    else:
        sam = False
63

brunner's avatar
brunner committed
64
    if 'forward.savestate.dir' in dacycle:
65
66
67
68
        fwddir = dacycle['forward.savestate.dir']
    else:
        logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters")
        fwddir = False
69

brunner's avatar
brunner committed
70
    if 'forward.savestate.legacy' in dacycle:
71
72
73
74
        legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"])
    else:
        legacy = False
        logging.debug("No forward.savestate.legacy key found in rc-file")
75
    
76
77
    if not fwddir:
        # Simply make a prior statevector using the normal method
78
79


80
        prepare_state(dacycle, statevector)#LU tutaj zamiast tego raczej to stworzenie nowej kowariancji i ensembli bo pozostale rzeczy sa na gorze i na doel.
81

82
83
84
    else: 
        # Read prior information from another simulation into the statevector.
        # This loads the results from another assimilation experiment into the current statevector
85

86
        if sam:
87
88
            filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d'))        
            #filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')
89
90
            statevector.read_from_file_exceptsam(filename, 'prior') 
        elif not legacy:
91
92
93
            filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate_%s.nc'%dacycle['time.start'].strftime('%Y%m%d'))        
            statevector.read_from_file(filename, 'prior') 
        else:
94
            filename = os.path.join(fwddir, dacycle['time.start'].strftime('%Y%m%d'), 'savestate.nc')
95
            statevector.read_from_legacy_file(filename, 'prior') 
96
97


98
99
100
101
102
    # We write this "prior" statevector to the restart directory, so we can later also populate it with the posterior statevector
    # Note that we could achieve the same by just copying the wanted forward savestate.nc file to the restart folder of our current
    # experiment, but then it would already contain a posterior field as well which we will try to write in save_and_submit. 
    # This could cause problems. Moreover, this method allows us to read older formatted savestate.nc files (legacy) and write them into
    # the current format through the "write_to_file" method.
103

104
    savefilename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
105
    statevector.write_to_file(savefilename, 'prior')
106

107
    # Now read optimized fluxes which we will actually use to propagate through the system
108
    
109
    if not fwddir:
110

111
        # if there is no forward dir specified, we simply run forward with unoptimized prior fluxes in the statevector
112
        logging.info("Running forward with prior savestate from: %s"%savefilename)
113

karolina's avatar
karolina committed
114
    else: 
115

116
117
        # Read posterior information from another simulation into the statevector.
        # This loads the results from another assimilation experiment into the current statevector
118

119
120
121
        if sam:
            statevector.read_from_file_exceptsam(filename, 'opt') 
        elif not legacy:
122
123
124
            statevector.read_from_file(filename, 'opt') 
        else:
            statevector.read_from_legacy_file(filename, 'opt') 
125

126
        logging.info("Running forward with optimized savestate from: %s"%filename)
127

128
    # Finally, we run forward with these parameters
129

130
    advance(dacycle, samples, statevector, obsoperator)
131

132
133
    # In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list.
    # This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis.
134

135
    save_and_submit(dacycle, statevector) 
136

137
    logging.info("Cycle finished...exiting pipeline")
138
139
####################################################################################################

140
def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
141
142
143
144
145
146
147
148
149
    """ Main entry point for analysis of ctdas results """

    from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data, save_weekly_avg_agg_data
    from da.analysis.expand_molefractions import write_mole_fractions
    from da.analysis.summarize_obs import summarize_obs
    from da.analysis.time_avg_fluxes import time_avg

    logging.info(header + "Starting analysis" + footer) 

150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
    dasystem.validate()                               
    dacycle.dasystem = dasystem                       
    dacycle.daplatform = platform                    
    dacycle.setup()                              
    statevector.setup(dacycle)   

    logging.info(header + "Starting mole fractions" + footer) 

    write_mole_fractions(dacycle)
    summarize_obs(dacycle['dir.analysis'])

    logging.info(header + "Starting weekly averages" + footer) 

    save_weekly_avg_1x1_data(dacycle, statevector)
    save_weekly_avg_state_data(dacycle, statevector)
    save_weekly_avg_tc_data(dacycle, statevector)
    save_weekly_avg_ext_tc_data(dacycle)
    save_weekly_avg_agg_data(dacycle,region_aggregate='transcom')
168
    save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended')
169
    save_weekly_avg_agg_data(dacycle,region_aggregate='olson')
170
    save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended')
171
172
173
174
175
176
    save_weekly_avg_agg_data(dacycle,region_aggregate='country')

    logging.info(header + "Starting monthly and yearly averages" + footer) 

    time_avg(dacycle,'flux1x1')
    time_avg(dacycle,'transcom')
177
    time_avg(dacycle,'transcom_extended')
178
    time_avg(dacycle,'olson')
179
    time_avg(dacycle,'olson_extended')
180
181
182
    time_avg(dacycle,'country')

    logging.info(header + "Finished analysis" + footer) 
183
184
185
186

def archive_pipeline(dacycle, platform, dasystem):
    """ Main entry point for archiving of output from one disk/system to another """

brunner's avatar
brunner committed
187
    if not 'task.rsync' in dacycle:
188
        logging.info('rsync task not found, not starting automatic backup...')
189
        return
190
191
192
    else:
        logging.info('rsync task found, starting automatic backup...')

193
194
195
    for task in dacycle['task.rsync'].split():
        sourcedirs = dacycle['task.rsync.%s.sourcedirs'%task]
        destdir = dacycle['task.rsync.%s.destinationdir'%task]
196

197
        rsyncflags = dacycle['task.rsync.%s.flags'%task]
198
199
200
201

        # file ID and names
        jobid = dacycle['time.end'].strftime('%Y%m%d') 
        targetdir = os.path.join(dacycle['dir.exec'])
202
203
        jobfile = os.path.join(targetdir, 'jb.rsync.%s.%s.jb' % (task,jobid) )
        logfile = os.path.join(targetdir, 'jb.rsync.%s.%s.log' % (task,jobid) )
204
        # Template and commands for job
205
        jobparams = {'jobname':"r.%s" % jobid, 'jobnodes': '1', 'jobtime': '1:00:00', 'joblog': logfile, 'errfile': logfile}
206
207
208
209
210

        if platform.ID == 'cartesius':
            jobparams['jobqueue'] = 'staging'

        template = platform.get_job_template(jobparams)
211
212
213
214
        for sourcedir in sourcedirs.split():
            execcommand = "\nrsync %s %s %s\n" % (rsyncflags, sourcedir,destdir,)
            template += execcommand

215
216
217
218
219
        # write and submit 
        platform.write_job(jobfile, template, jobid)
        jobid = platform.submit_job(jobfile, joblog=logfile)


220
def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator):
221
222
    """ Set up the job specific directory structure and create an expanded rc-file """

223
224
225
    dasystem.validate()                               
    dacycle.dasystem = dasystem                       
    dacycle.daplatform = platform                    
226
    dacycle.setup()                              
227
228
229
    #statevector.dacycle = dacycle # also embed object in statevector so it can access cycle information for I/O etc   
    #samples.dacycle = dacycle # also embed object in samples object so it can access cycle information for I/O etc    
    #obsoperator.dacycle = dacycle # also embed object in obsoperator object so it can access cycle information for I/O etc
230
231
    obsoperator.setup(dacycle)  # Setup Observation Operator                                                          
    statevector.setup(dacycle)   
232
233

def prepare_state(dacycle, statevector):
234
    """ Set up the input data for the forward model: obs and parameters/fluxes"""
235

236
    # We now have an empty statevector object that we need to populate with data. If this is a continuation from a previous cycle, we can read
237
    # the previous statevector values from a NetCDF file in the restart directory. If this is the first cycle, we need to populate the statevector
238
    # with new values for each week. After we have constructed the statevector, it will be propagated by one cycle length so it is ready to be used
239
240
    # in the current cycle

karolina's avatar
karolina committed
241
    logging.info(header + "starting prepare_state" + footer)
242

243
    if not dacycle['time.restart']:                    
244
245
246

        # Fill each week from n=1 to n=nlag with a new ensemble

247
248
249
250
        for n in range(statevector.nlag):
            date = dacycle['time.start'] + datetime.timedelta(days=(n + 0.5) * int(dacycle['time.cycle']))            
            cov = statevector.get_covariance(date, dacycle)
            statevector.make_new_ensemble(n, cov)
251
252
253

    else:

254
        # Read the statevector data from file
255

256
        #saved_sv = os.path.join(dacycle['dir.restart.current'], 'savestate.nc')               
257
258
        
        
259
260
        saved_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['da.restart.tstamp'].strftime('%Y%m%d'))
        statevector.read_from_file(saved_sv) # by default will read "opt"(imized) variables, and then propagate
261
262

        # Now propagate the ensemble by one cycle to prepare for the current cycle
263
        statevector.propagate(dacycle)
264

265
    # Finally, also write the statevector to a file so that we can always access the a-priori information
266

267
268
    current_sv = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
    statevector.write_to_file(current_sv, 'prior')  # write prior info 
269

270
def sample_state(dacycle, samples, statevector, obsoperator):
271
272
273
274
275
    """ Sample the filter state for the inversion """


    # Before a forecast step, save all the data to a save/tmp directory so we can later recover it before the propagation step.
    # This is especially important for:
276
    #  (i) The transport model restart data which holds the background mole fractions. This is needed to run the model one cycle forward
277
278
    #  (ii) The random numbers (or the seed for the random number generator) so we can recreate the ensembles if needed

279
    #status   = dacycle.MoveSaveData(io_option='store',save_option='partial',filter=[])
280
    #msg     = "All restart data have been copied to the save/tmp directory for future use"    ; logging.debug(msg)
karolina's avatar
karolina committed
281
    logging.info(header + "starting sample_state" + footer) 
282
    nlag = int(dacycle['time.nlag'])
283
    logging.info("Sampling model will be run over %d cycles" % nlag)           
284

285
    obsoperator.get_initial_data()
286

karolina's avatar
karolina committed
287
    for lag in range(nlag):                                                               
288
        logging.info(header + ".....Ensemble Kalman Filter at lag %d" % (lag + 1)) 
289

290
        ############# Perform the actual sampling loop #####################
291

292
        sample_step(dacycle, samples, statevector, obsoperator, lag)
293

294
        logging.debug("statevector now carries %d samples" % statevector.nobs)
Peters, Wouter's avatar
Peters, Wouter committed
295

296

297
def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
298
    """ Perform all actions needed to sample one cycle """
karolina's avatar
karolina committed
299
    
300
301

    # First set up the information for time start and time end of this sample
302
    dacycle.set_sample_times(lag)
303

304
305
306
307
    startdate = dacycle['time.sample.start'] 
    enddate = dacycle['time.sample.end'] 
    dacycle['time.sample.window'] = lag
    dacycle['time.sample.stamp'] = "%s_%s" % (startdate.strftime("%Y%m%d%H"), enddate.strftime("%Y%m%d%H"))
308

karolina's avatar
karolina committed
309
310
311
    logging.info("New simulation interval set : ")
    logging.info("                  start date : %s " % startdate.strftime('%F %H:%M'))
    logging.info("                  end   date : %s " % enddate.strftime('%F %H:%M'))
312
    logging.info("                  file  stamp: %s " % dacycle['time.sample.stamp'])
313
314
315
316
317


    # Implement something that writes the ensemble member parameter info to file, or manipulates them further into the 
    # type of info needed in your transport model

318
    statevector.write_members_to_file(lag, dacycle['dir.input']) 
319

320
    samples.setup(dacycle)       
321
    samples.add_observations() 
322

323
324
    # Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??

325
    samples.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc']) 
326
    
327
328
    sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
    samples.write_sample_coords(sampling_coords_file) 
329
330
    # Write filename to dacycle, and to output collection list
    dacycle['ObsOperator.inputfile'] = sampling_coords_file
331
332
333

    # Run the observation operator

334
    obsoperator.run_forecast_model()
335

336

337
338
339
    # Read forecast model samples that were written to NetCDF files by each member. Add them to the exisiting
    # Observation object for each sample loop. This data fill be written to file in the output folder for each sample cycle. 
    
340

341
342
    # We retrieve all model samples from one output file written by the ObsOperator. If the ObsOperator creates
    # one file per member, some logic needs to be included to merge all files!!!
343

344
    if os.path.exists(sampling_coords_file):
345
        samples.add_simulations(obsoperator.simulated_file)
346
347
348
        logging.debug("Simulated values read, continuing pipeline")
    else: 
        logging.warning("No simulations added, because input file does not exist (no samples found in obspack)")
349

350
    # Now write a small file that holds for each observation a number of values that we need in postprocessing
351

352
    #filename = samples.write_sample_coords() 
353

354
    # Now add the observations that need to be assimilated to the statevector. 
355
356
357
358
    # Note that obs will only be added to the statevector if either this is the first step (restart=False), or lag==nlag
    # This is to make sure that the first step of the system uses all observations available, while the subsequent
    # steps only optimize against the data at the front (lag==nlag) of the filter. This way, each observation is used only 
    # (and at least) once # in the assimilation
359
360


361
    if not advance:
362
        if dacycle['time.restart'] == False or lag == int(dacycle['time.nlag']) - 1:
363
            statevector.obs_to_assimilate += (copy.deepcopy(samples),)
364
365
            statevector.nobs += samples.getlength()
            logging.debug("Added samples from the observation operator to the assimilated obs list in the statevector")
366

367
        else:
368
            statevector.obs_to_assimilate += (None,)
369

Peters, Wouter's avatar
Peters, Wouter committed
370

371
def invert(dacycle, statevector, optimizer):
372
    """ Perform the inverse calculation """
karolina's avatar
karolina committed
373
    logging.info(header + "starting invert" + footer)
374
375
376
377
    dims = (int(dacycle['time.nlag']),
                  int(dacycle['da.optimizer.nmembers']),
                  int(dacycle.dasystem['nparameters']),
                  statevector.nobs)
378

brunner's avatar
brunner committed
379
    if 'opt.algorithm' not in dacycle.dasystem:
karolina's avatar
karolina committed
380
381
        logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)") 
        logging.info("...using serial algorithm as default...")
382
383
        optimizer.set_algorithm('Serial')
    elif dacycle.dasystem['opt.algorithm'] == 'serial':
karolina's avatar
karolina committed
384
        logging.info("Using the serial minimum least squares algorithm to solve ENKF equations")
385
386
        optimizer.set_algorithm('Serial')
    elif dacycle.dasystem['opt.algorithm'] == 'bulk':
karolina's avatar
karolina committed
387
        logging.info("Using the bulk minimum least squares algorithm to solve ENKF equations")
388
        optimizer.set_algorithm('Bulk')
389
    
390
    optimizer.setup(dims)
391
    optimizer.state_to_matrix(statevector)    
karolina's avatar
karolina committed
392
    
393
    diagnostics_file = os.path.join(dacycle['dir.output'], 'optimizer.%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
karolina's avatar
karolina committed
394
        
395
    optimizer.write_diagnostics(diagnostics_file, 'prior')
396
    optimizer.set_localization(dacycle['da.system.localization'])
397
    
398
399
    if optimizer.algorithm == 'Serial':
        optimizer.serial_minimum_least_squares()
400
    else: 
401
        optimizer.bulk_minimum_least_squares()
karolina's avatar
karolina committed
402
    
403
404
    optimizer.matrix_to_state(statevector)
    optimizer.write_diagnostics(diagnostics_file, 'optimized')
405
    
406
    
407

408
def advance(dacycle, samples, statevector, obsoperator):
409
410
411
412
413
    """ Advance the filter state to the next step """

    # This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)

    # Then, restore model state from the start of the filter
karolina's avatar
karolina committed
414
    logging.info(header + "starting advance" + footer)
karolina's avatar
karolina committed
415
    logging.info("Sampling model will be run over 1 cycle")
416

417
    obsoperator.get_initial_data()
418

419
    sample_step(dacycle, samples, statevector, obsoperator, 0, True) 
420

421
422
423
    dacycle.restart_filelist.extend(obsoperator.restart_filelist)
    dacycle.output_filelist.extend(obsoperator.output_filelist)
    logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ")
424
    
425
    dacycle.output_filelist.append(dacycle['ObsOperator.inputfile'])
426
    logging.debug("Appended Observation filename to dacycle for collection: %s"%(dacycle['ObsOperator.inputfile']))
427

428
429
430
431
432
    sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
    if os.path.exists(sampling_coords_file):
        outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
        samples.write_sample_auxiliary(outfile)
    else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)")
433

434
def save_and_submit(dacycle, statevector):
435
    """ Save the model state and submit the next job """
karolina's avatar
karolina committed
436
    logging.info(header + "starting save_and_submit" + footer)
437
    
438
439
    filename = os.path.join(dacycle['dir.restart'], 'savestate_%s.nc' % dacycle['time.start'].strftime('%Y%m%d'))
    statevector.write_to_file(filename, 'opt')
karolina's avatar
karolina committed
440
    
441
442
    dacycle.output_filelist.append(filename)
    dacycle.finalize()
443
444
445
446