Commit 4face3de authored by karolina's avatar karolina
Browse files

minor changes

parent 56c61c3e
......@@ -74,8 +74,10 @@ class DaSystem(dict):
needed_rc_items = {}
for k, v in self.iteritems():
if v == 'True' : self[k] = True
if v == 'False': self[k] = False
if v == 'True' :
self[k] = True
if v == 'False':
self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
......
......@@ -14,6 +14,10 @@ import os
import sys
import logging
import datetime
import numpy as np
import numpy.linalg as la
import da.tools.io4 as io
identifier = 'Optimizer baseclass'
version = '0.0'
......@@ -43,7 +47,6 @@ class Optimizer(object):
def create_matrices(self):
""" Create Matrix space needed in optimization routine """
import numpy as np
# mean state [X]
self.x = np.zeros((self.nlag * self.nparams,), float)
......@@ -63,7 +66,7 @@ class Optimizer(object):
self.R = np.zeros((self.nobs,), float)
self.HPHR = np.zeros((self.nobs,), float)
else:
self.R = np.zeros((self.nobs,self.nobs,), float)
self.R = np.zeros((self.nobs, self.nobs,), float)
self.HPHR = np.zeros((self.nobs, self.nobs,), float)
# localization of obs
self.may_localize = np.zeros(self.nobs, bool)
......@@ -83,8 +86,6 @@ class Optimizer(object):
self.KG = np.zeros((self.nlag * self.nparams, self.nobs,), float)
def state_to_matrix(self, StateVector):
import numpy as np
allsites = [] # collect all obs for n=1,..,nlag
allobs = [] # collect all obs for n=1,..,nlag
allmdm = [] # collect all mdm for n=1,..,nlag
......@@ -101,7 +102,7 @@ class Optimizer(object):
self.x[n * self.nparams:(n + 1) * self.nparams] = members[0].ParameterValues
self.X_prime[n * self.nparams:(n + 1) * self.nparams, :] = np.transpose(np.array([m.ParameterValues for m in members]))
if Samples != None:
if Samples != None: #LU jednoznaczne z if Samples
self.rejection_threshold = Samples.rejection_threshold
allreject.extend(Samples.getvalues('may_reject'))
......@@ -115,7 +116,7 @@ class Optimizer(object):
simulatedensemble = Samples.getvalues('simulated')
if allsimulated == None :
if allsimulated == None : #LU if allsimulated: np.concatenate; else: np.array
allsimulated = np.array(simulatedensemble)
else:
allsimulated = np.concatenate((allsimulated, np.array(simulatedensemble)), axis=0)
......@@ -139,7 +140,7 @@ class Optimizer(object):
self.R[i] = mdm ** 2
else:
for i, mdm in enumerate(allmdm):
self.R[i,i] = mdm ** 2
self.R[i, i] = mdm ** 2
def matrix_to_state(self, StateVector):
for n in range(self.nlag):
......@@ -164,8 +165,6 @@ class Optimizer(object):
The type designation refers to the writing of prior or posterior data and is used in naming the variables"
"""
import da.tools.io4 as io
#import da.tools.io as io
outdir = DaCycle['dir.diagnostics']
filename = os.path.join(outdir, 'optimizer.%s.nc' % DaCycle['time.start'].strftime('%Y%m%d'))
......@@ -309,8 +308,6 @@ class Optimizer(object):
def serial_minimum_least_squares(self):
""" Make minimum least squares solution by looping over obs"""
import numpy as np
for n in range(self.nobs):
# Screen for flagged observations (for instance site not found, or no sample written from model)
......@@ -336,10 +333,10 @@ class Optimizer(object):
self.KG[:, n] = PHt / self.HPHR[n]
if self.may_localize[n]:
logging.debug('Trying to localize observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
logging.debug('Trying to localize observation %s, %i' % (self.sitecode[n], self.obs_ids[n]))
self.localize(n)
else:
logging.debug('Not allowed to localize observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
logging.debug('Not allowed to localize observation %s, %i' % (self.sitecode[n], self.obs_ids[n]))
alpha = np.double(1.0) / (np.double(1.0) + np.sqrt((self.R[n]) / self.HPHR[n]))
......@@ -366,8 +363,7 @@ class Optimizer(object):
def bulk_minimum_least_squares(self):
""" Make minimum least squares solution by solving matrix equations"""
import numpy as np
import numpy.linalg as la
# Create full solution, first calculate the mean of the posterior analysis
......@@ -411,8 +407,7 @@ class Optimizer(object):
def localize(self, n):
""" localize the Kalman Gain matrix """
logging.debug('Not localized observation %s, %i' % (self.sitecode[n],self.obs_ids[n]))
pass
logging.debug('Not localized observation %s, %i' % (self.sitecode[n], self.obs_ids[n]))
def set_algorithm(self):
self.algorithm = 'Serial'
......@@ -429,7 +424,6 @@ if __name__ == "__main__":
from da.tools.initexit import start_logger
from da.tools.initexit import CycleControl
from da.ct.obs import CtObservations
import numpy as np
opts = ['-v']
args = {'rc':'../../da.rc', 'logfile':'da_initexit.log', 'jobrcfilename':'test.rc'}
......
......@@ -282,7 +282,7 @@ class StateVector(object):
logging.debug('%d new ensemble members were added to the state vector # %d' % (self.nmembers, lag))
def propagate(self):
def propagate(self, startdate, cycle):
"""
:rtype: None
......@@ -301,7 +301,7 @@ class StateVector(object):
self.EnsembleMembers.append([])
# And now create a new time step of mean + members for n=nlag
date = self.DaCycle['time.start'] + timedelta(days=(self.nlag - 0.5) * int(self.DaCycle['time.cycle']))
date = startdate + timedelta(days=(self.nlag - 0.5) * int(cycle))
cov = self.get_covariance(date)
self.make_new_ensemble(self.nlag, cov)
......@@ -401,7 +401,7 @@ class StateVector(object):
logging.info('Successfully read the State Vector from file (%s) ' % filename)
def write_members_to_file(self, lag):
def write_members_to_file(self, lag, outdir):
"""
:param: lag: Which lag step of the filter to write, must lie in range [1,...,nlag]
:rtype: None
......@@ -423,7 +423,6 @@ class StateVector(object):
#import da.tools.io as io
#import da.tools.io4 as io
outdir = self.DaCycle['dir.input']
members = self.EnsembleMembers[lag - 1]
for mem in members:
......
......@@ -318,7 +318,7 @@ class ObsPackObservations(Observation):
species, site, method, lab, nr = os.path.split(obs.fromfile)[-1].split('_')
identifier = "%s_%02d_%s" % (site, int(lab), method,)
identifier = "%s_%02d_%s" % (site, int(lab), method)
identifier = name_convert(name="%s_%s_%s" % (site.lower(), method.lower(), lab.lower(),), to='GV')
......
......@@ -118,12 +118,12 @@ class CtStateVector(StateVector):
for n in range(self.nlag):
if qual == 'opt':
MeanState = f.get_variable('xac_%02d'%(n+1))
EnsembleMembers = f.get_variable('adX_%02d'%(n+1))
MeanState = f.get_variable('xac_%02d' % (n + 1))
EnsembleMembers = f.get_variable('adX_%02d' % (n + 1))
elif qual == 'prior':
MeanState = f.get_variable('xpc_%02d'%(n+1))
EnsembleMembers = f.get_variable('pdX_%02d'%(n+1))
MeanState = f.get_variable('xpc_%02d' % (n + 1))
EnsembleMembers = f.get_variable('pdX_%02d' % (n + 1))
if not self.EnsembleMembers[n] == []:
self.EnsembleMembers[n] = []
......
......@@ -7,44 +7,39 @@
import sys
import os
import logging
dummy = sys.path.append(os.getcwd())
sys.path.append(os.getcwd())
#################################################################################################
# Next, import the tools needed to initialize a data assimilation cycle
#################################################################################################
from da.tools.initexit import start_logger
from da.tools.initexit import validate_opts_args
from da.tools.initexit import parse_options
from da.tools.initexit import start_logger, validate_opts_args, parse_options, CycleControl
from da.tools.pipeline import ensemble_smoother_pipeline
from da.platform.maunaloa import MaunaloaPlatForm
from da.ct.dasystem import CtDaSystem
from da.ct.statevector import CtStateVector
#from da.ct.obspack import ObsPackObservations
from da.ct.obs import CtObservations
from da.tm5.observationoperator import TM5ObservationOperator
from da.ct.optimizer import CtOptimizer
from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data
from da.analysis.expand_mixingratios import write_mixing_ratios
#################################################################################################
# Parse and validate the command line options, start logging
#################################################################################################
dummy = start_logger()
opts, args = parse_options()
opts,args = validate_opts_args(opts,args)
start_logger()
opts, args = validate_opts_args(parse_options())
#################################################################################################
# Create the Cycle Control object for this job
#################################################################################################
from da.tools.initexit import CycleControl
DaCycle = CycleControl(opts,args)
DaCycle = CycleControl(opts, args)
###########################################################################################
### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ##
###########################################################################################
from da.tools.pipeline import ensemble_smoother_pipeline
from da.platform.maunaloa import MaunaloaPlatForm
from da.ct.dasystem import CtDaSystem
from da.ct.statevector import CtStateVector
#from da.ct.obspack import ObsPackObservations
from da.ct.obs import CtObservations
from da.tm5.observationoperator import TM5ObservationOperator
from da.ct.optimizer import CtOptimizer
PlatForm = MaunaloaPlatForm()
DaSystem = CtDaSystem(DaCycle['da.system.rc'])
......@@ -58,31 +53,24 @@ Optimizer = CtOptimizer()
################### ENTER THE PIPELINE WITH THE OBJECTS PASSED BY THE USER ###############
##########################################################################################
from da.tools.pipeline import header,footer
from da.tools.pipeline import header, footer
msg = header+"Entering Pipeline "+footer ; logging.info(msg)
logging.info(header + "Entering Pipeline " + footer)
ensemble_smoother_pipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer)
ensemble_smoother_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer)
##########################################################################################
################### All done, extra stuff can be added next, such as analysis
##########################################################################################
msg = header+"Starting analysis"+footer ; logging.info(msg)
from da.analysis.expand_fluxes import save_weekly_avg_1x1_data
from da.analysis.expand_fluxes import save_weekly_avg_state_data
from da.analysis.expand_fluxes import save_weekly_avg_tc_data
from da.analysis.expand_fluxes import save_weekly_avg_ext_tc_data
from da.analysis.expand_mixingratios import write_mixing_ratios
logging.info(header + "Starting analysis" + footer)
savedas = save_weekly_avg_1x1_data(DaCycle, StateVector)
savedas = save_weekly_avg_state_data(DaCycle, StateVector)
savedas = save_weekly_avg_tc_data(DaCycle, StateVector)
savedas = save_weekly_avg_ext_tc_data(DaCycle)
savedas = write_mixing_ratios(DaCycle)
save_weekly_avg_1x1_data(DaCycle, StateVector)
save_weekly_avg_state_data(DaCycle, StateVector)
save_weekly_avg_tc_data(DaCycle, StateVector)
save_weekly_avg_ext_tc_data(DaCycle)
write_mixing_ratios(DaCycle)
sys.exit(0)
......
......@@ -7,44 +7,41 @@
import sys
import os
import logging
dummy = sys.path.append(os.getcwd())
sys.path.append(os.getcwd())
#################################################################################################
# Next, import the tools needed to initialize a data assimilation cycle
#################################################################################################
from da.tools.initexit import start_logger
from da.tools.initexit import validate_opts_args
from da.tools.initexit import parse_options
from da.tools.initexit import start_logger, parse_options, validate_opts_args, CycleControl
from da.tools.pipeline import ensemble_smoother_pipeline
from da.platform.maunaloa import MaunaloaPlatForm
from da.ctgridded.dasystem import CtGriddedDaSystem
from da.ctgridded.statevector import CtGriddedStateVector
from da.ct.obs import CtObservations
from da.tm5.observationoperator import TM5ObservationOperator
from da.ct.optimizer import CtOptimizer
from da.tools.pipeline import header, footer
from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data
from da.analysis.expand_mixingratios import write_mixing_ratios
#################################################################################################
# Parse and validate the command line options, start logging
#################################################################################################
dummy = start_logger()
opts, args = parse_options()
opts,args = validate_opts_args(opts,args)
start_logger()
opts, args = validate_opts_args(parse_options())
#################################################################################################
# Create the Cycle Control object for this job
#################################################################################################
from da.tools.initexit import CycleControl
DaCycle = CycleControl(opts,args)
###########################################################################################
### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ##
###########################################################################################
from da.tools.pipeline import ensemble_smoother_pipeline
from da.platform.maunaloa import MaunaloaPlatForm
from da.ctgridded.dasystem import CtGriddedDaSystem
from da.ctgridded.statevector import CtGriddedStateVector
from da.ct.obs import CtObservations
from da.tm5.observationoperator import TM5ObservationOperator
from da.ct.optimizer import CtOptimizer
DaCycle = CycleControl(opts, args)
PlatForm = MaunaloaPlatForm()
DaSystem = CtGriddedDaSystem(DaCycle['da.system.rc'])
......@@ -57,30 +54,21 @@ Optimizer = CtOptimizer()
################### ENTER THE PIPELINE WITH THE OBJECTS PASSED BY THE USER ###############
##########################################################################################
from da.tools.pipeline import header,footer
msg = header+"Entering Pipeline "+footer ; logging.info(msg)
ensemble_smoother_pipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer)
logging.info(header + "Entering Pipeline " + footer)
ensemble_smoother_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer)
##########################################################################################
################### All done, extra stuff can be added next, such as analysis
##########################################################################################
msg = header+"Starting analysis"+footer ; logging.info(msg)
from da.analysis.expand_fluxes import save_weekly_avg_1x1_data
from da.analysis.expand_fluxes import save_weekly_avg_state_data
from da.analysis.expand_fluxes import save_weekly_avg_tc_data
from da.analysis.expand_fluxes import save_weekly_avg_ext_tc_data
from da.analysis.expand_mixingratios import write_mixing_ratios
logging.info(header + "Starting analysis" + footer)
savedas = save_weekly_avg_1x1_data(DaCycle, StateVector)
savedas = save_weekly_avg_state_data(DaCycle, StateVector)
savedas = save_weekly_avg_tc_data(DaCycle, StateVector)
savedas = save_weekly_avg_ext_tc_data(DaCycle)
savedas = write_mixing_ratios(DaCycle)
save_weekly_avg_1x1_data(DaCycle, StateVector)
save_weekly_avg_state_data(DaCycle, StateVector)
save_weekly_avg_tc_data(DaCycle, StateVector)
save_weekly_avg_ext_tc_data(DaCycle)
write_mixing_ratios(DaCycle)
sys.exit(0)
......
......@@ -7,43 +7,35 @@
import sys
import os
import logging
dummy = sys.path.append(os.getcwd())
sys.path.append(os.getcwd())
#################################################################################################
# Next, import the tools needed to initialize a data assimilation cycle
#################################################################################################
from da.tools.initexit import start_logger
from da.tools.initexit import validate_opts_args
from da.tools.initexit import parse_options
from da.tools.initexit import start_logger, validate_opts_args, parse_options, CycleControl
from da.tools.pipeline import ensemble_smoother_pipeline
from da.platform.jet import JetPlatForm
from da.ct.dasystem import CtDaSystem
from da.ct.statevector import CtStateVector
from da.ct.obs import CtObservations
from da.tm5.observationoperator import TM5ObservationOperator
from da.ct.optimizer import CtOptimizer
from da.tools.pipeline import header, footer
#################################################################################################
# Parse and validate the command line options, start logging
#################################################################################################
dummy = start_logger()
opts, args = parse_options()
opts,args = validate_opts_args(opts,args)
start_logger()
opts, args = validate_opts_args(parse_options())
#################################################################################################
# Create the Cycle Control object for this job
#################################################################################################
from da.tools.initexit import CycleControl
DaCycle = CycleControl(opts,args)
###########################################################################################
### IMPORT THE APPLICATION SPECIFIC MODULES HERE, TO BE PASSED INTO THE MAIN PIPELINE!!! ##
###########################################################################################
from da.tools.pipeline import ensemble_smoother_pipeline
from da.platform.jet import JetPlatForm
from da.ct.dasystem import CtDaSystem
from da.ct.statevector import CtStateVector
from da.ct.obs import CtObservations
from da.tm5.observationoperator import TM5ObservationOperator
from da.ct.optimizer import CtOptimizer
DaCycle = CycleControl(opts, args)
PlatForm = JetPlatForm()
DaSystem = CtDaSystem(DaCycle['da.system.rc'])
......@@ -56,11 +48,10 @@ Optimizer = CtOptimizer()
################### ENTER THE PIPELINE WITH THE OBJECTS PASSED BY THE USER ###############
##########################################################################################
from da.tools.pipeline import header,footer
msg = header+"Entering Pipeline "+footer ; logging.info(msg)
logging.info(header + "Entering Pipeline " + footer)
ensemble_smoother_pipeline(DaCycle,PlatForm, DaSystem, Samples,StateVector,ObsOperator,Optimizer)
ensemble_smoother_pipeline(DaCycle, PlatForm, DaSystem, Samples, StateVector, ObsOperator, Optimizer)
##########################################################################################
......
......@@ -23,11 +23,9 @@ class CapeGrimPlatForm(PlatForm):
self.Version = '1.0' # the platform version used
def give_blocking_flag(self):
return ""
def give_queue_type(self):
return "foreground"
def get_job_template(self,joboptions={},block=False):
......
......@@ -9,20 +9,16 @@ File created on 06 Sep 2010.
"""
import sys
import os
import logging
import subprocess
from da.baseclasses.platform import PlatForm, std_joboptions
from da.baseclasses.platform import PlatForm
std_joboptions={'jobname':'test','jobaccount':'co2','jobtype':'serial','jobshell':'/bin/sh','depends':'','jobtime':'24:00:00','jobinput':'/dev/null','jobnode':'','jobtasks':'','modulenetcdf':'netcdf/4.1.2','networkMPI':''}
std_joboptions = {'jobname':'test', 'jobaccount':'co2', 'jobtype':'serial', 'jobshell':'/bin/sh', 'depends':'', 'jobtime':'24:00:00', 'jobinput':'/dev/null', 'jobnode':'', 'jobtasks':'', 'modulenetcdf':'netcdf/4.1.2', 'networkMPI':''}
class HuygensPlatForm(PlatForm):
def __init__(self):
self.Identifier = 'huygens' # the identifier gives the platform name
self.Version = '1.0' # the platform version used
......@@ -46,13 +42,9 @@ class HuygensPlatForm(PlatForm):
-on Jet/Zeus: return
"""
return "queue"
def get_job_template(self,joboptions={},block=False):
def get_job_template(self, joboptions={}, block=False):
"""
Returns the job template for a given computing system, and fill it with options from the dictionary provided as argument.
The job template should return the preamble of a job that can be submitted to a queue on your platform,
......@@ -86,46 +78,46 @@ class HuygensPlatForm(PlatForm):
# """\n"""
template = """#!/bin/bash \n"""+\
"""## \n"""+ \
"""## This is a set of dummy names, to be replaced by values from the dictionary \n"""+ \
"""## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """+ \
"""## \n"""+ \
"""## @ node_usage = normal\n"""+\
"""jobnode \n"""+\
"""jobtasks \n"""+\
"""networkMPI \n"""+\
"""# @ notification = never\n"""+\
"""# @ input = jobinput\n"""+\
"""# @ output = logfile.$(jobid)\n"""+\
"""# @ error = logfile.$(jobid)\n"""+\
"""# @ wall_clock_limit = jobtime\n""" +\
"""# @ job_type = jobtype \n"""+\
"""# @ shell = /bin/bash\n"""+\
"""# @ queue \n"""+\
"""\n"""+\
"""module load ctdas\n"""+\
template = """#!/bin/bash \n""" + \
"""## \n""" + \
"""## This is a set of dummy names, to be replaced by values from the dictionary \n""" + \
"""## Please make your own platform specific template with your own keys and place it in a subfolder of the da package.\n """ + \
"""## \n""" + \
"""## @ node_usage = normal\n""" + \
"""jobnode \n""" + \
"""jobtasks \n""" + \
"""networkMPI \n""" + \
"""# @ notification = never\n""" + \
"""# @ input = jobinput\n""" + \
"""# @ output = logfile.$(jobid)\n""" + \
"""# @ error = logfile.$(jobid)\n""" + \
"""# @ wall_clock_limit = jobtime\n""" + \
"""# @ job_type = jobtype \n""" + \
"""# @ shell = /bin/bash\n""" + \
"""# @ queue \n""" + \
"""\n""" + \
"""module load ctdas\n""" + \
"""\n"""
if 'depends' in joboptions:
template += """#$ -hold_jid depends \n"""
# First replace from passed dictionary
for k,v in joboptions.iteritems():
for k, v in joboptions.iteritems():
while k in template:
template = template.replace(k,v)
template = template.replace(k, v)
# Fill remaining values with std_options
for k,v in std_joboptions.iteritems():
for k, v in std_joboptions.iteritems():
while k in template:
template = template.replace(k,v)
template = template.replace(k, v)
return template
msg1 = 'Platform initialized: %s'%self.Identifier ; logging.info(msg1)
msg1 = 'Platform initialized: %s' % self.Identifier ; logging.info(msg1)
# #msg2 = '%s version: %s'%(self.Identifier,self.Version) ; logging.info(msg2)
......@@ -154,7 +146,7 @@ class HuygensPlatForm(PlatForm):
def submit_job(self,jobfile,joblog=None,block=False):
def submit_job(self, jobfile, joblog=None, block=False):
""" This method submits a jobfile to the queue, and returns the queue ID """
......@@ -162,19 +154,18 @@ class HuygensPlatForm(PlatForm):
# msg = "A new task will be started (%s)"%cmd ; logging.info(msg)
if block:
cmd = ["llsubmit","-s",jobfile]
msg = "A new task will be started (%s)"%cmd ; logging.info(msg)
cmd = ["llsubmit", "-s", jobfile]
msg = "A new task will be started (%s)" % cmd ; logging.info(msg)
output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] ; logging.info(output)
print 'output',output