time_avg_fluxes.py 8.79 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
"""CarbonTracker Data Assimilation Shell (CTDAS) Copyright (C) 2017 Wouter Peters. 
Users are recommended to contact the developers (wouter.peters@wur.nl) to receive
updates of the code. See also: http://www.carbontracker.eu. 

This program is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software Foundation, 
version 3. This program is distributed in the hope that it will be useful, but 
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

You should have received a copy of the GNU General Public License along with this 
program. If not, see <http://www.gnu.org/licenses/>."""
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python
# time_avg_fluxes.py

"""
Author : peters 

Revision History:
File created on 20 Dec 2012.

"""
import sys
sys.path.append('../../')
import os
import sys
import shutil
28
from dateutil.relativedelta import relativedelta
29
import datetime
30
import subprocess
31
32
33
34

def time_avg(dacycle,avg='transcom'):
    """ Function to create a set of averaged files in a folder, needed to make longer term means """
    
35
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
36
        raise IOError('Choice of averaging invalid')
37
38
39
40

    analysisdir = dacycle['dir.analysis']

    if not os.path.exists(analysisdir):
41
        raise IOError('analysis dir requested (%s) does not exist, exiting...'%analysisdir)
42

43
    daily_avg(dacycle,avg)
44

45
    monthly_avg(dacycle,avg)
46

47
    yearly_avg(dacycle,avg)
48

49
    longterm_avg(dacycle,avg)
50

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def new_month(dacycle):
    """ check whether we just entered a new month"""

    this_month = dacycle['time.start'].month
    prev_month = (dacycle['time.start']-dacycle['cyclelength']).month

    return (this_month != prev_month)

def new_year(dacycle):
    """ check whether we just entered a new year"""

    this_year = dacycle['time.start'].year
    prev_year = (dacycle['time.start']-dacycle['cyclelength']).year

    return (this_year != prev_year)

67
68
69
def daily_avg(dacycle,avg):
    """ Function to create a set of daily files in a folder, needed to make longer term means """
    
70
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
71
        raise IOError('Choice of averaging invalid')
72
73
74
75
76
77

    analysisdir = dacycle['dir.analysis']
    weekdir = os.path.join(analysisdir , 'data_%s_weekly'%avg)
    daydir = os.path.join(analysisdir , 'data_%s_daily'%avg)

    if not os.path.exists(daydir):
78
        print("Creating new output directory " + daydir)
79
80
81
        os.makedirs(daydir)

    files  = os.listdir(weekdir)
82
    files = [f for f in files if '-' in f and f.endswith('.nc')]
83
84
85
86
87
88
89
90

    fileinfo = {}
    for filename in files:
        date=datetime.datetime.strptime(filename.split('.')[-2],'%Y-%m-%d')
        fileinfo[filename] = date
    
    dt = dacycle['cyclelength']

91
    for k,v in fileinfo.items():
92
93
94
95
96
97
98
99
100
101
        cycle_file = os.path.join(weekdir,k)
        for i in range(abs(dt.days)):
            daily_file = os.path.join(daydir,'%s_fluxes.%s.nc'%(avg,(v+datetime.timedelta(days=i)).strftime('%Y-%m-%d')))
            if not os.path.lexists(daily_file):
                os.symlink(cycle_file,daily_file)
                #print daily_file,cycle_file

def monthly_avg(dacycle,avg):
    """ Function to average a set of files in a folder from daily to monthly means """
    
102
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
103
        raise IOError('Choice of averaging invalid')
104
105
106
107
108
109
110

    analysisdir = dacycle['dir.analysis']

    daydir = os.path.join(analysisdir , 'data_%s_daily'%avg)
    monthdir = os.path.join(analysisdir,'data_%s_monthly'%avg)

    if not os.path.exists(monthdir):
111
        print("Creating new output directory " + monthdir)
112
113
114
115
        os.makedirs(monthdir)


    files  = os.listdir(daydir)  # get daily files
116
    files = [f for f in files if '-' in f and f.endswith('.nc')]
117

118
    if len(files) < 28:
119
        print('No month is yet complete, skipping monthly average')
120
121
        return

122
123
124
125
126
    fileinfo = {}
    for filename in files:  # parse date from each of them
        date=datetime.datetime.strptime(filename.split('.')[-2],'%Y-%m-%d')
        fileinfo[filename] = date

127
128
    years = [d.year for d in list(fileinfo.values())]   # get actual years
    months = set([d.month for d in list(fileinfo.values())])  # get actual months
129
130
131
132
133
134
135
   
    sd = datetime.datetime(min(years),1,1)
    ed = datetime.datetime(max(years)+1,1,1)

    while sd < ed: 

        nd = sd + relativedelta(months=+1)
136
137

        ndays_in_month = (nd-sd).days
138
        
139
        avg_files = [os.path.join(daydir,k) for k,v in fileinfo.items() if v < nd and v >= sd]
140
       
141
142
143
144
        if len(avg_files) != ndays_in_month: # only once month complete 
            #print 'New month (%02d) is not yet complete, skipping monthly average'%(sd.month)
            pass
        else:
145
146
            targetfile = os.path.join(monthdir,'%s_fluxes.%s.nc'%(avg,sd.strftime('%Y-%m')))
            if not os.path.exists(targetfile):
147
                print("New month (%02d) is complete, I have %d days for the next file"%(sd.month,ndays_in_month))
148
149
150
151
152
153
154
155
156
157
                command = ['ncra','-O']+ avg_files + [targetfile]
                status = subprocess.check_call(command)
            else:
                pass

        sd = nd

def yearly_avg(dacycle,avg):
    """ Function to average a set of files in a folder from monthly to yearly means """

158
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
159
        raise IOError('Choice of averaging invalid')
160
161
162
163
164
165

    analysisdir = dacycle['dir.analysis']
    monthdir = os.path.join(analysisdir , 'data_%s_monthly'%avg )
    yeardir = os.path.join(analysisdir,'data_%s_yearly'%avg)

    if not os.path.exists(yeardir):
166
        print("Creating new output directory " + yeardir)
167
168
169
        os.makedirs(yeardir)

    files  = os.listdir(monthdir)  # get monthly files
170
    files = [f for f in files if '-' in f and f.endswith('.nc')]
171

172
    if not files:
173
        print("No full year finished yet, skipping yearly average...")
174
175
        return

176
177
178
179
180
    fileinfo = {}
    for filename in files:
        date=datetime.datetime.strptime(filename.split('.')[-2],'%Y-%m')
        fileinfo[filename] = date

181
    years = set([d.year for d in list(fileinfo.values())])
182

183
184
185
186
187
188
189
    sd = datetime.datetime(min(years),1,1)
    ed = datetime.datetime(max(years)+1,1,1)

    while sd < ed: 

        nd = sd + relativedelta(years=+1)
        
190
        avg_files = [os.path.join(monthdir,k) for k,v in fileinfo.items() if v < nd and v >= sd]
191
       
192
        if not len(avg_files) == 12 : 
193
            print("Year %04d not finished yet, skipping yearly average..."%sd.year)
194
        else:
195
196
197
            targetfile = os.path.join(yeardir,'%s_fluxes.%s.nc'%(avg,sd.strftime('%Y')))
        
            if not os.path.exists(targetfile):
198
                print("Year %04d is complete, I have 12 months for the next file"%sd.year)
199
200
201
202
203
204
205
206
                command = ['ncra','-O']+ avg_files + [targetfile]
                status = subprocess.check_call(command)

        sd = nd

def longterm_avg(dacycle,avg):
    """ Function to average a set of files in a folder from monthly to yearly means """

207
    if avg not in ['transcom','transcom_extended','olson','olson_extended','country','flux1x1']:
208
        raise IOError('Choice of averaging invalid')
209
210
211
212
213
214
215

    analysisdir = dacycle['dir.analysis']

    yeardir = os.path.join(analysisdir , 'data_%s_yearly'%avg )
    longtermdir = os.path.join(analysisdir,'data_%s_longterm'%avg)

    if not os.path.exists(longtermdir):
216
        print("Creating new output directory " + longtermdir)
217
218
219
        os.makedirs(longtermdir)

    files  = os.listdir(yeardir)
220
    files = [f for f in files if '-' in f and f.endswith('.nc')]
221

222
    if not files:
223
        print("No full year finished yet, skipping longterm average...")
224
225
        return

226
227
228
229
230
231
232
233
234
235
236
    dates = []
    for filename in files:
        date=datetime.datetime.strptime(filename.split('.')[-2],'%Y')
        dates.append( date )

    avg_files = [os.path.join(yeardir,k) for k in files]
   
    if len(avg_files) > 0 : 
        command = ['ncra','-O']+ avg_files + [os.path.join(longtermdir,'%s_fluxes.%04d-%04d.nc'%(avg,min(dates).year, max(dates).year))]
        status = subprocess.check_call(command)

237
238
239
240
241
242
if __name__ == "__main__":

    from da.tools.initexit import CycleControl

    sys.path.append('../../')

243
    dacycle = CycleControl(args={'rc':'../../ctdas-ei-nobcb-zoom-ecoregions.rc'})
244
245
246
    dacycle.setup()
    dacycle.parse_times()

247
248
249
250
251
252
253
254
    while dacycle['time.end'] < dacycle['time.finish']:
        time_avg(dacycle,avg='flux1x1')
        time_avg(dacycle,avg='transcom')
        time_avg(dacycle,avg='transcom_extended')
        time_avg(dacycle,avg='olson')
        time_avg(dacycle,avg='olson_extended')
        time_avg(dacycle,avg='country')
        dacycle.advance_cycle_times()
255