time_avg_fluxes.py 8.77 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. 
Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
updates of the code. See also: http://www.carbontracker.eu. 

This program is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software Foundation, 
version 3. This program is distributed in the hope that it will be useful, but 
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

You should have received a copy of the GNU General Public License along with this 
program. If not, see <http://www.gnu.org/licenses/>."""
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python
# time_avg_fluxes.py

"""
Author : peters 

Revision History:
File created on 20 Dec 2012.

"""
import sys
sys.path.append('../../')
import os
import sys
import shutil
28
from dateutil.relativedelta import relativedelta
29
import datetime
30
import subprocess
31
32
33
34

def time_avg(dacycle,avg='transcom'):
    """ Function to create a set of averaged files in a folder, needed to make longer term means """
    
35
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
36
37
38
39
40
41
42
        raise IOError,'Choice of averaging invalid'

    analysisdir = dacycle['dir.analysis']

    if not os.path.exists(analysisdir):
        raise IOError,'analysis dir requested (%s) does not exist, exiting...'%analysisdir

43
    daily_avg(dacycle,avg)
44

45
    monthly_avg(dacycle,avg)
46

47
    yearly_avg(dacycle,avg)
48

49
    longterm_avg(dacycle,avg)
50

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def new_month(dacycle):
    """ check whether we just entered a new month"""

    this_month = dacycle['time.start'].month
    prev_month = (dacycle['time.start']-dacycle['cyclelength']).month

    return (this_month != prev_month)

def new_year(dacycle):
    """ check whether we just entered a new year"""

    this_year = dacycle['time.start'].year
    prev_year = (dacycle['time.start']-dacycle['cyclelength']).year

    return (this_year != prev_year)

67
68
69
def daily_avg(dacycle,avg):
    """ Function to create a set of daily files in a folder, needed to make longer term means """
    
70
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
71
72
73
74
75
76
77
78
79
80
81
        raise IOError,'Choice of averaging invalid'

    analysisdir = dacycle['dir.analysis']
    weekdir = os.path.join(analysisdir , 'data_%s_weekly'%avg)
    daydir = os.path.join(analysisdir , 'data_%s_daily'%avg)

    if not os.path.exists(daydir):
        print "Creating new output directory " + daydir
        os.makedirs(daydir)

    files  = os.listdir(weekdir)
82
    files = [f for f in files if '-' in f and f.endswith('.nc')]
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

    fileinfo = {}
    for filename in files:
        date=datetime.datetime.strptime(filename.split('.')[-2],'%Y-%m-%d')
        fileinfo[filename] = date
    
    dt = dacycle['cyclelength']

    for k,v in fileinfo.iteritems():
        cycle_file = os.path.join(weekdir,k)
        for i in range(abs(dt.days)):
            daily_file = os.path.join(daydir,'%s_fluxes.%s.nc'%(avg,(v+datetime.timedelta(days=i)).strftime('%Y-%m-%d')))
            if not os.path.lexists(daily_file):
                os.symlink(cycle_file,daily_file)
                #print daily_file,cycle_file

def monthly_avg(dacycle,avg):
    """ Function to average a set of files in a folder from daily to monthly means """
    
102
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
103
104
105
106
107
108
109
110
111
112
113
114
115
        raise IOError,'Choice of averaging invalid'

    analysisdir = dacycle['dir.analysis']

    daydir = os.path.join(analysisdir , 'data_%s_daily'%avg)
    monthdir = os.path.join(analysisdir,'data_%s_monthly'%avg)

    if not os.path.exists(monthdir):
        print "Creating new output directory " + monthdir
        os.makedirs(monthdir)


    files  = os.listdir(daydir)  # get daily files
116
    files = [f for f in files if '-' in f and f.endswith('.nc')]
117

118
119
120
121
    if len(files) < 28:
        print 'No month is yet complete, skipping monthly average'
        return

122
123
124
125
126
127
128
129
130
131
132
133
134
135
    fileinfo = {}
    for filename in files:  # parse date from each of them
        date=datetime.datetime.strptime(filename.split('.')[-2],'%Y-%m-%d')
        fileinfo[filename] = date

    years = [d.year for d in fileinfo.values()]   # get actual years
    months = set([d.month for d in fileinfo.values()])  # get actual months
   
    sd = datetime.datetime(min(years),1,1)
    ed = datetime.datetime(max(years)+1,1,1)

    while sd < ed: 

        nd = sd + relativedelta(months=+1)
136
137

        ndays_in_month = (nd-sd).days
138
139
140
        
        avg_files = [os.path.join(daydir,k) for k,v in fileinfo.iteritems() if v < nd and v >= sd]
       
141
142
143
144
        if len(avg_files) != ndays_in_month: # only once month complete 
            #print 'New month (%02d) is not yet complete, skipping monthly average'%(sd.month)
            pass
        else:
145
146
            targetfile = os.path.join(monthdir,'%s_fluxes.%s.nc'%(avg,sd.strftime('%Y-%m')))
            if not os.path.exists(targetfile):
147
                print "New month (%02d) is complete, I have %d days for the next file"%(sd.month,ndays_in_month)
148
149
150
151
152
153
154
155
156
157
                command = ['ncra','-O']+ avg_files + [targetfile]
                status = subprocess.check_call(command)
            else:
                pass

        sd = nd

def yearly_avg(dacycle,avg):
    """ Function to average a set of files in a folder from monthly to yearly means """

158
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
159
160
161
162
163
164
165
166
167
168
169
        raise IOError,'Choice of averaging invalid'

    analysisdir = dacycle['dir.analysis']
    monthdir = os.path.join(analysisdir , 'data_%s_monthly'%avg )
    yeardir = os.path.join(analysisdir,'data_%s_yearly'%avg)

    if not os.path.exists(yeardir):
        print "Creating new output directory " + yeardir
        os.makedirs(yeardir)

    files  = os.listdir(monthdir)  # get monthly files
170
    files = [f for f in files if '-' in f and f.endswith('.nc')]
171

172
173
174
175
    if not files:
        print "No full year finished yet, skipping yearly average..."
        return

176
177
178
179
180
181
    fileinfo = {}
    for filename in files:
        date=datetime.datetime.strptime(filename.split('.')[-2],'%Y-%m')
        fileinfo[filename] = date

    years = set([d.year for d in fileinfo.values()])
182

183
184
185
186
187
188
189
190
191
    sd = datetime.datetime(min(years),1,1)
    ed = datetime.datetime(max(years)+1,1,1)

    while sd < ed: 

        nd = sd + relativedelta(years=+1)
        
        avg_files = [os.path.join(monthdir,k) for k,v in fileinfo.iteritems() if v < nd and v >= sd]
       
192
193
194
        if not len(avg_files) == 12 : 
            print "Year %04d not finished yet, skipping yearly average..."%sd.year
        else:
195
196
197
            targetfile = os.path.join(yeardir,'%s_fluxes.%s.nc'%(avg,sd.strftime('%Y')))
        
            if not os.path.exists(targetfile):
198
                print "Year %04d is complete, I have 12 months for the next file"%sd.year
199
200
201
202
203
204
205
206
                command = ['ncra','-O']+ avg_files + [targetfile]
                status = subprocess.check_call(command)

        sd = nd

def longterm_avg(dacycle,avg):
    """ Function to average a set of files in a folder from monthly to yearly means """

207
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
208
209
210
211
212
213
214
215
216
217
218
219
        raise IOError,'Choice of averaging invalid'

    analysisdir = dacycle['dir.analysis']

    yeardir = os.path.join(analysisdir , 'data_%s_yearly'%avg )
    longtermdir = os.path.join(analysisdir,'data_%s_longterm'%avg)

    if not os.path.exists(longtermdir):
        print "Creating new output directory " + longtermdir
        os.makedirs(longtermdir)

    files  = os.listdir(yeardir)
220
    files = [f for f in files if '-' in f and f.endswith('.nc')]
221

222
223
224
225
    if not files:
        print "No full year finished yet, skipping longterm average..."
        return

226
227
228
229
230
231
232
233
234
235
236
    dates = []
    for filename in files:
        date=datetime.datetime.strptime(filename.split('.')[-2],'%Y')
        dates.append( date )

    avg_files = [os.path.join(yeardir,k) for k in files]
   
    if len(avg_files) > 0 : 
        command = ['ncra','-O']+ avg_files + [os.path.join(longtermdir,'%s_fluxes.%04d-%04d.nc'%(avg,min(dates).year, max(dates).year))]
        status = subprocess.check_call(command)

237
238
239
240
241
242
if __name__ == "__main__":

    from da.tools.initexit import CycleControl

    sys.path.append('../../')

243
    dacycle = CycleControl(args={'rc':'../../ctdas-ei-nobcb-zoom-ecoregions.rc'})
244
245
246
    dacycle.setup()
    dacycle.parse_times()

247
248
249
250
251
252
253
254
    while dacycle['time.end'] < dacycle['time.finish']:
        time_avg(dacycle,avg='flux1x1')
        time_avg(dacycle,avg='transcom')
        time_avg(dacycle,avg='transcom_extended')
        time_avg(dacycle,avg='olson')
        time_avg(dacycle,avg='olson_extended')
        time_avg(dacycle,avg='country')
        dacycle.advance_cycle_times()
255