Skip to content
Snippets Groups Projects
Commit d98a0d3f authored by Daalen, Tim van's avatar Daalen, Tim van
Browse files

AGENSO debugging and rewrite to store batches

parent d82e9389
No related branches found
No related tags found
No related merge requests found
......@@ -41,30 +41,31 @@ class AGENSO:
if self.token == None:
self.login()
url = upload_url
(status_code, reply) = self.upload_helper(url, json_data)
json_string = json.dumps(json_data)
(status_code, reply) = self.upload_helper(url, json_string)
if status_code != 200:
#self.token = None
self.token = None
return (False, reply)
return (True, reply)
def bulkupload(self, json_data):
def batch_upload(self, json_string):
if self.token == None:
self.login()
url = bulk_upload_url
(status_code, reply) = self.upload_helper(url, json_data)
(status_code, reply) = self.upload_helper(url, json_string)
if status_code != 200:
#self.token = None
self.token = None
return (False, reply)
return (True, reply)
#handles both normal and bluk uploads
#for genral message: status 200 if succesful
#for bulk also: [[true,200],[true,200]] (for each subrequest)
def upload_helper(self, url, json_data):
def upload_helper(self, url, json_string):
params = {'api_token' : self.token}
headers = {'Content-Type': 'application/json'}
try:
r = requests.post(url, params= params, headers = headers, data=json_data)
r = requests.post(url, params= params, headers = headers, data=json_string)
if (r.status_code != 200): #Http Error, resturn status: False
print("URL: %s, statuscode: %d, reply: %s" %(url, r.status_code, r.text))
status = "error"
......@@ -72,6 +73,7 @@ class AGENSO:
status = json.loads(r.text)
return (r.status_code, status)
except Exception as e:
print("Agenso error" + str(e))
#if type(e).__name__ == "ConnectionError":
return (0, 0)
......
......@@ -7,10 +7,6 @@ import os
from Agenso_api import *
from data_types import *
#API token, used to send the results
#API_token_lock = Lock()
#API_token_gl = AGENSO_login()
BATCHSIZE = 20 #number of results send in a batch to Agenso
SEND_INTERVAL = 300 #sleep 300secs after transmitting
......@@ -22,7 +18,7 @@ class send_AGENSO_data(Thread):
self.agenso = AGENSO()
self.msg_folder = CONFIG['AGENSO-messages-folder']
self.error_folder = CONFIG['AGENSO-error-folder']
self.send_interval = CONFIG['AGENSO-send-interval']
# Initialize the thread
Thread.__init__(self)
print("send_data thread started")
......@@ -31,43 +27,30 @@ class send_AGENSO_data(Thread):
while self._running:
files_to_upload = glob(os.path.join(self.msg_folder, '*.txt'))
if len(files_to_upload) == 0:
sleep(5) #if no files sleep 5 sec to save cpu*-
bulk_data = bulk_data_class()
file_names = []
#send files to AGENSO in batches
for file_path in files_to_upload :
data_obj = data_fromfile(file_path)
bulk_data.items.append(data_obj)
file_names.append(file_path)
if len(bulk_data.items) == BATCHSIZE:
self.send_bulk(bulk_data, file_names)
bulk_data.items = []
file_names = []
data_batch = batch_fromfile(file_path)
self.send_batch(data_batch, file_path)
sleep(self.send_interval) #secs
#send last values
if len(bulk_data.items) > 0:
self.send_bulk(bulk_data, file_names)
sleep(SEND_INTERVAL) #secs
def send_batch(self, data_batch, file_path):
base_filename, ext = os.path.splitext(os.path.basename(file_path))
base_filepath = os.path.join(self.error_folder, base_filename)
def send_bulk(self, bulk_data, sending_files):
(upl_status, reply) = self.agenso.bulkupload(data_tojson(bulk_data))
data_json = json.dumps({"items":data_batch})
(upl_status, reply) = self.agenso.batch_upload(data_json)
if upl_status == True:
for i in range(len(reply)):#check all the request responces
if (reply[i][0] == True) and (reply[i][1] == 200):
os.remove(sending_files[i])
else: # bad request
if (reply[i][0] == False) or (reply[i][1] != 200):
#fautive message posts
print("bad request send to Agenso:")
print(reply[i][0])
print(bulk_data.items[i])
if self.error_folder != None:#move to badreq folder
filename = sending_files[i].rsplit('/',1)[-1]
os.replace(sending_files[i], self.error_folder + "/" + filename)
else:
os.remove(sending_files[i])
print(data_batch[i])
fautive_message = data_batch[i]
f_file_path = base_filepath + "_" + str(i) + ext
data_tofile(fautive_message, f_file_path)
os.remove(file_path)
def terminate(self):
self._running = False
\ No newline at end of file
......@@ -21,6 +21,10 @@ save-every-x: 10 #dont save all imgs & GPS coordinates, its simply to much
min-movement-frame: 0.1 #m between frames
GPS-checks: False #enable/disable GPS moving checks to facilitate desk debugging
#AGENSO
AGENSO-batchsize: 20 #number of detections we send in one file
AGENSO-send-interval: 15 #300 #send stored results in a burst every x secs
#folders
weights-folder: weights
img-input-folder: /home/adlink/inference/data/images/apple_scab
......
......@@ -38,33 +38,43 @@ class data_class():#lat, lng, hdop, satnum, gps_quality, timestamp
self.gps_quality = "not_valid"
#bulk upload data class
class bulk_data_class():
def __init__(self):
self.items = []
# class batch_data_class():
# def __init__(self):
# self.items = []
def data_tojson(data):
#serialise the class as dicts
data_json = json.dumps(data.__dict__, default = lambda o: o.__dict__, indent = 4)
return data_json
def batch_tofile(data, filename):
#serialise the class as list of dicts
with open(filename, 'w') as file:
json.dump([ob.__dict__ for ob in data], file, default = lambda o: o.__dict__, indent = 4)
# def data_tojson(data):
# #serialise the class as dicts
# data_json = json.dumps(data.__dict__, default = lambda o: o.__dict__, indent = 4)
# return data_json
def data_tofile(data, filename):
#serialise the class as dicts
with open(filename, 'w') as file:
json.dump(data.__dict__, file, default = lambda o: o.__dict__, indent = 4)
json.dump(data, file, default = lambda o: o.__dict__, indent = 4)
# reads the json to a dict, not to a class instance!
# Acces as data_obj['timestamp'] instead of data_obj.timestamp
def data_fromjson(data_json):
data = json.loads(data_json)
return data
#reads the json into a general class object. you can use it as a normal class instance.
def data_fromfile(filename):
# def data_fromjson(data_json):
# data = json.loads(data_json)
# return data
# #reads the json into a general class object. you can use it as a normal class instance.
# def data_fromfile(filename):
# with open(filename) as file:
# data = json.load(file, object_hook= lambda d: Namespace(**d))
# return data
#read file as json string
def batch_fromfile(filename):
with open(filename) as file:
data = json.load(file, object_hook= lambda d: Namespace(**d))
return data
data = json.load(file)
return data
##-----------------------------------------------------------------------------------------------------------
#GPS DATA class
......
......@@ -402,7 +402,8 @@ class post_processing(Thread):
self._running = True
self.freememory = free_gbs()
self.call_counter = 0
self.batch_size = CONFIG['AGENSO-batchsize']
self.batch = []
# Initialize the thread
Thread.__init__(self)
print("to_send_queue to file saver started")
......@@ -412,16 +413,22 @@ class post_processing(Thread):
while self._running:
item = to_send_queue.get() #blocking operation
if CONFIG['AGENSO'] == True:
#check free memory
self.call_counter = self.call_counter + 1
if self.call_counter >= CHECK_EVERY:
self.freememory = free_gbs()
#only store file if there is enough space
if self.freememory > MIN_FREE_SPACE:
file_path= os.path.join(CONFIG['AGENSO-messages-folder'], (item['file_name'] + ".txt"))
data = item['data']
data_tofile(data, file_path)
data = item['data']
self.batch.append(data)#items as dicts
if len(self.batch) >= self.batch_size:
#check free memory
self.call_counter = self.call_counter + 1
if self.call_counter >= CHECK_EVERY:
self.freememory = free_gbs()
#only store file if there is enough space
if self.freememory > MIN_FREE_SPACE:
#file_path= os.path.join(CONFIG['AGENSO-messages-folder'], (item['file_name'] + ".txt"))
file_path = os.path.join(CONFIG['AGENSO-messages-folder'], (datetime.utcnow().strftime("%y-%m-%dT%H:%M:%S") + ".txt"))
batch_tofile(self.batch, file_path)
self.batch = []
def terminate(self):
self._running = False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment