From 429acb4797861354f68d153568a56e1e78715ec5 Mon Sep 17 00:00:00 2001 From: "Woude, Auke van der" <auke.vanderwoude@wur.nl> Date: Wed, 3 Nov 2021 15:05:03 +0100 Subject: [PATCH] Small bugfixes and improved logging --- da/pipelines/nrtpipeline.py | 9 +++++++ da/preprocessing/classes.py | 42 +++++++++++++++++++++++++------ da/preprocessing/era5.py | 5 ++-- da/preprocessing/inputs.py | 44 +++++++++++++++++++-------------- da/preprocessing/merra.py | 3 ++- da/preprocessing/poormans.py | 15 +++-------- da/preprocessing/upscale_bio.py | 4 +-- 7 files changed, 79 insertions(+), 43 deletions(-) diff --git a/da/pipelines/nrtpipeline.py b/da/pipelines/nrtpipeline.py index 5f9e637b..05fe6fc7 100755 --- a/da/pipelines/nrtpipeline.py +++ b/da/pipelines/nrtpipeline.py @@ -56,6 +56,9 @@ def prepare_global_inputs(dacycle, dasystem, platform): """Prepare inputs on a global scale, enhanced with inputs on a regional scale""" base_pipeline(dacycle, dasystem, platform, function=global_regional_inputs) +def statistical_fit_pipeline(dacycle, dasystem, platform): + base_pipeline(dacycle, dasystem, platform, function=statistical_fit_inputs) + def regional_inputs(dacycle): """ Download and process input data sets with no additional information""" import da.preprocessing.inputs as inputs @@ -66,8 +69,14 @@ def poormans_inputs(dacycle): import da.preprocessing.inputs as inputs inputs.poormans_inputs(dacycle) +def statistical_fit_inputs(dacycle): + """ Only do the statistical fit (speedup)""" + import da.preprocessing.inputs as inputs + inputs.statistical_fit_fluxes(dacycle) + def global_regional_inputs(dacycle): """ Download and process input data sets using data from global, enhanced with regional scale data""" import da.preprocessing.inputs as inputs inputs.regional_inputs(dacycle) inputs.global_inputs(dacycle, combine_with_regional=True) + diff --git a/da/preprocessing/classes.py b/da/preprocessing/classes.py index 8b54145d..f9e957d4 100644 --- a/da/preprocessing/classes.py +++ b/da/preprocessing/classes.py @@ -31,7 +31,7 @@ class Regional(object): self.inputdir = dacycle['dir.input'] self.tmpdir = dacycle.dasystem['dir.tmp'] - self.recalc_factor = 1e-6 # For recalculating umol/m2/s to TM5 units: mol/m2/s + self.recalc_factor = 1. # For recalculating umol/m2/s to TM5 units: mol/m2/s self.name = 'Here goes the flux name' # Overwrite in the subclasses @@ -48,17 +48,35 @@ class Regional(object): return filenames def average2d(self, arr, new_shape, newlats=None, newlons=None): + """Average area weighted values""" + if newlats is not None: + lat_start = newlats[0] + lat_end = newlats[-1] + else: + lat_start = self.lat_start + lat_end = self.lat_end + newlats = self.lats + if newlons is not None: + lon_start = newlons[0] + lon_end = newlons[-1] + else: + lon_start = self.lon_start + lon_end = self.lon_end + newlons = self.lons + + logging.debug('Averaging from old shape [{}] to new shape [{}]'.format(arr.shape, new_shape)) + # Calculate the lats and lons of the input shape - lats_inputshape = np.linspace(self.lat_start, self.lat_end, arr.shape[-2]) - lons_inputshape = np.linspace(self.lon_start, self.lon_end, arr.shape[-1]) + lats_inputshape = np.linspace(lat_start, lat_end, arr.shape[-2]) + lons_inputshape = np.linspace(lon_start, lon_end, arr.shape[-1]) # Get the shape of the axis of which to average to shape = (new_shape[0], arr.shape[-2] // new_shape[-2], new_shape[1], arr.shape[-1] // new_shape[-1]) # Get the area of the input lats and lons. - if newlats is None: newlats = self.lats - if newlons is None: newlons = self.lons new_area = self.calc_area(newlats, newlons) area = self.calc_area(lats_inputshape, lons_inputshape) + logging.debug('new area sum / area sum: {}'.format(new_area.sum() / area.sum())) + logging.debug('In lats: {} {} ; Out lats: {} {}'.format(lats_inputshape[0], lats_inputshape[-1], newlats[0], newlats[-1])) # If time is included: add the time dimension to the shape if len(arr.shape) == 3: @@ -122,7 +140,7 @@ class Regional(object): savedict = io.std_savedict.copy() savedict['name'] = self.name savedict['long_name'] = '{} CO2 emissions'.format(self.name) - savedict['units'] = "mole/m2/s" + savedict['units'] = "umole/m2/s" dims = dimtime + dimlat + dimlon savedict['dims'] = dims savedict['values'] = values * self.recalc_factor @@ -151,6 +169,10 @@ class Global(Regional): self.regional_lons = self.lons self.lats = np.linspace(-89.5, 89.5, 180) self.lons = np.linspace(-179.5, 179.5, 360) + self.lat_start = self.lats[0] + self.lon_start = self.lons[0] + self.lat_end = self.lats[-1] + self.lon_end = self.lons[-1] self.startdate = dtm.datetime(self.startdate.year, self.startdate.month, 1) enddate = self.startdate + dtm.timedelta(days=45) # Be sure to go to the next momth self.enddate = dtm.datetime(enddate.year, enddate.month, 1) - dtm.timedelta(hours=1) @@ -177,9 +199,13 @@ class Global(Regional): return logging.info('Combining high-res with global data...') + lat_start = float(self.dacycle.dasystem['domain.lat.start']) + lat_end = float(self.dacycle.dasystem['domain.lat.end']) + lon_start = float(self.dacycle.dasystem['domain.lon.start']) + lon_end = float(self.dacycle.dasystem['domain.lon.end']) - lat_indices = np.argmin(abs(self.lats - self.lat_end)) +1, np.argmin(abs(self.lats - self.lat_start)) - lon_indices = np.argmin(abs(self.lons - self.lon_start)), np.argmin(abs(self.lons - self.lon_end)) +1 + lat_indices = np.argmin(abs(self.lats - lat_end)) +1, np.argmin(abs(self.lats - lat_start)) + lon_indices = np.argmin(abs(self.lons - lon_start)), np.argmin(abs(self.lons - lon_end)) +1 new_shape = (lat_indices[0] - lat_indices[1], lon_indices[1] - lon_indices[0]) # Include the indices at the end new_lats = self.lats[lat_indices[1]: lat_indices[0]] diff --git a/da/preprocessing/era5.py b/da/preprocessing/era5.py index e5750f02..5a073b3e 100644 --- a/da/preprocessing/era5.py +++ b/da/preprocessing/era5.py @@ -12,6 +12,7 @@ from da.tools.temploglevel import TempLogLevel from cdo import Cdo RANAME = 'era5' +logging.info('Using ERA data') #@print_decorator def start_download_era(dacycle, starttime=None, endtime=None): @@ -134,7 +135,7 @@ def check_era_present(dacycle): # SiB runs start 'NRUNYEARS_SPINUP' years beforehand, on the first of the month sib_starttime = dtm.datetime(starttime.year - NRUNYEARS_SPINUP, 1, 1) # SiB ends on the final day of the last month in the dasystem - sib_endtime = dtm.datetime(endtime.year, endtime.month + 1, 1) - dtm.timedelta(days=1) + sib_endtime = dtm.datetime(endtime.year + endtime.month // 13, endtime.month %12 + 1, 1) - dtm.timedelta(days=1) # Check which files are already present and create a symlink dst = dacycle['dir.input'] @@ -273,8 +274,6 @@ def process_era_emismodel(dacycle): outfile = cdo.select('name=time,ssrd,t2m,v10,u10,sst', input=outf, output=outfile)#'{}merged.ERA.selected.nc'.format(tmpdir)) # Interpolate bilinear to the domain size. -# logging.debug('Remapping the ERA data to new shape following gridfile: {}'.format(dacycle.dasystem['domain.grid.file'])) -# cdo.remapbil(dacycle.dasystem['domain.grid.file'], input=infile, output=outfile) logging.debug('Written processed, emission model-ready ERA data to {}'.format(outfile)) return 0 diff --git a/da/preprocessing/inputs.py b/da/preprocessing/inputs.py index d0744b8d..9d6265f3 100644 --- a/da/preprocessing/inputs.py +++ b/da/preprocessing/inputs.py @@ -24,24 +24,17 @@ from da.tools.post_slack import print_decorator, slack_decorator import logging #logging.root.setLevel(logging.DEBUG) -def test_inputs(dacycle): - """Testing function""" - from da.preprocessing.merra import start_download_merra, process_merra_emismodel - start_download_merra(dacycle) - process_merra_emismodel(dacycle) - return - - def regional_inputs(dacycle): """Make fluxes for the European domain""" #get_fire_fluxes(dacycle) - #make_bio_fluxes(dacycle) - make_ff_fluxes(dacycle) - upscale_ocean(dacycle) + make_bio_fluxes(dacycle) + #make_ff_fluxes(dacycle) + #upscale_ocean(dacycle) def global_inputs(dacycle, combine_with_regional=False): """Make fluxes for the globe""" - if not dacycle['time.start'].day == 1: + if dacycle['time.start'].day is not 1 and combine_with_regional is False: + logging.debug('Noting to do for this day') return get_global_fire(dacycle, combine_with_regional) get_global_bio(dacycle, combine_with_regional) @@ -68,17 +61,30 @@ def poormans_inputs(dacycle): dacycle['time.start'] = month dacycle['time.end'] = month + relativedelta(months=1)# - dtm.timedelta(days=1) dacycle['dir.output'] = dacycle['dir.output'][:-8] + dtm.datetime.strftime(month, '%Y%m%d') - #get_global_fire(dacycle) - #get_global_ff(dacycle) - #get_global_ocean(dacycle) + get_global_fire(dacycle) + get_global_ff(dacycle) + get_global_ocean(dacycle) if month in months_fluxes: fluxmaker = PoorMans(dacycle) fluxmaker.get_bio() +def statistical_fit_fluxes(dacycle): + from dateutil.relativedelta import relativedelta + months = list(rrule.rrule(rrule.MONTHLY, + dtstart=dacycle['time.start'], + until=dacycle['time.finish'])) + # First, prepare the fluxes to calculate the growth rate from + for month in months: + logging.info('Getting global fluxes for {}'.format(month)) + dacycle['time.start'] = month + dacycle['time.end'] = month + relativedelta(months=1)# - dtm.timedelta(days=1) + dacycle['dir.output'] = dacycle['dir.output'][:-8] + dtm.datetime.strftime(month, '%Y%m%d') + global_inputs(dacycle) + def get_global_bio(dacycle, combine_with_regional=False): """Get the biosphere fluxes outside the smaller domain""" - from da.preprocessing.upscale_bio import GlobalBio - fluxmaker = GlobalBio(dacycle) + from da.preprocessing.upscale_bio import StatisticalFluxes + fluxmaker = StatisticalFluxes(dacycle) fluxmaker.get_fluxes() fluxmaker.write_total_emissions() if combine_with_regional: @@ -113,6 +119,7 @@ def make_ff_fluxes(dacycle): Writes the emissions to outputdir Needs the MERRA data for this timesteps to be downloaded already""" from da.preprocessing.emissionmodel import EmisModel + #from da.preprocessing.emissionmodel import EmisModelERA as EmisModel emismodel = EmisModel(dacycle) emismodel.setup() outfile, varname = emismodel.get_emis() @@ -127,7 +134,8 @@ def make_bio_fluxes(dacycle): outfile, variablename = upscaler.run() def upscale_ocean(dacycle): - from da.preprocessing.ocean import Ocean +# from da.preprocessing.ocean import Ocean + from da.preprocessing.ocean import OceanERA as Ocean upscaler = Ocean(dacycle) outfile, variablename = upscaler.run() diff --git a/da/preprocessing/merra.py b/da/preprocessing/merra.py index 8139eb16..329fca28 100644 --- a/da/preprocessing/merra.py +++ b/da/preprocessing/merra.py @@ -10,6 +10,7 @@ from da.tools.tempworkingdir import TempWorkingDirectory from cdo import Cdo RANAME = 'merra2' +logging.info('Using MERRA data') #@print_decorator def start_download_merra(dacycle, starttime=None, endtime=None): @@ -113,7 +114,7 @@ def check_merra_present(dacycle): # SiB runs start 'NRUNYEARS_SPINUP' years beforehand, on the first of the month sib_starttime = dtm.datetime(starttime.year - NRUNYEARS_SPINUP, starttime.month, 1) # SiB ends on the final day of the last month in the dasystem - sib_endtime = dtm.datetime(endtime.year, endtime.month + 1, 1) - dtm.timedelta(days=1) + sib_endtime = dtm.datetime(endtime.year + endtime.month // 13, endtime.month %12 + 1, 1) - dtm.timedelta(days=1) # Check which files are already present and create a symlink dst = dacycle['dir.input'] diff --git a/da/preprocessing/poormans.py b/da/preprocessing/poormans.py index b9d43f05..f7f6e42f 100644 --- a/da/preprocessing/poormans.py +++ b/da/preprocessing/poormans.py @@ -84,17 +84,14 @@ class PoorMans(SiBUpscaler, Global): fname = self.dacycle.dasystem['file.sib.clim.month'] with nc.Dataset(fname) as ds: - times = nc.num2date(ds['time'][:], ds['time'].units) - startind = np.argmin(abs(times - starttime)) + 1 # dtm rounds months down - endind = np.argmin(abs(times - endtime)) +1 # dtm rounds months down - monthly_gpp = ds['gpp'][startind: endind, :] - monthly_ter = ds['reco'][startind: endind, :] + monthly_gpp = ds['gpp'][self.dacycle['time.start'].month -1] + monthly_ter = ds['reco'][self.dacycle['time.start'].month -1] latsib = ds['latsib'][:] lonsib = ds['lonsib'][:] # Average to monthly values - gpp_avg = monthly_gpp.reshape(-1, 12, monthly_gpp.shape[-1]).mean(axis=0) - ter_avg = monthly_ter.reshape(-1, 12, monthly_ter.shape[-1]).mean(axis=0) + gpp_avg = monthly_gpp + ter_avg = monthly_ter # Make into global (2d) fields clim_gpp = st.convert_2d(gpp_avg, latsib, lonsib, empty_val=0) @@ -102,7 +99,6 @@ class PoorMans(SiBUpscaler, Global): # Average to 1x1 degree gpp = self.average_to_global_grid(clim_gpp) ter = self.average_to_global_grid(clim_ter) - return gpp, ter def get_pmi_nee(self, months): @@ -110,7 +106,6 @@ class PoorMans(SiBUpscaler, Global): # Get GPP and TER for the months that we're looking at the growth rates month_indices = np.array([m.month -1 for m in months]) gpp, ter = self.calculate_prior_bio() - gpp, ter = gpp[month_indices].sum(axis=0), ter[month_indices].sum(axis=0) gpp, ter = self.add_daily_cycle(gpp, ter) # Get the other emissions other_emissions = self.get_emissions(months) # FF, fire, ocean @@ -135,8 +130,6 @@ class PoorMans(SiBUpscaler, Global): # Now that we have a scaling factor, get the GPP and TER for the current month # And apply the scaling factor to that GPP gpp, ter = self.calculate_prior_bio() - month_idx = self.dacycle['time.start'].month - 1 - gpp, ter = gpp[month_idx], ter[month_idx] gpp, ter = self.add_daily_cycle(gpp, ter) gpp = gpp * scaling_factor logging.debug('New GPP calculated: {0:.2} PgC'.format(self.calculate_total_emissions(emissions=gpp) / 1e15)) diff --git a/da/preprocessing/upscale_bio.py b/da/preprocessing/upscale_bio.py index b631d74e..429b5daf 100644 --- a/da/preprocessing/upscale_bio.py +++ b/da/preprocessing/upscale_bio.py @@ -152,10 +152,10 @@ class StatisticalFluxes(SiBUpscaler, Global): def get_fluxes(self): logging.debug('Getting global biosphere fluxes from the SF') - file = '/projects/0/ctdas/input/cte-hr/ctsf/bio/global.{0}_2018_{1:02}.nc'.format('bio', self.dacycle['time.start'].month) + file = '/projects/0/ctdas/input/cte-hr/ctsf/bio/global.{0}_2018{1:02}.nc'.format('bio', self.dacycle['time.start'].month) with nc.Dataset(file) as ds: nee = ds['bio'][:] -# nee = self.average_to_global_grid(nee) + #nee = self.average_to_global_grid(nee) # Write to file filename = self.write(nee * 1e6) -- GitLab