From 0762482b2e67cfbae971926ba1c6a56d3d6e57cf Mon Sep 17 00:00:00 2001 From: Tim <tim.vandaalen@wur.nl> Date: Tue, 30 Mar 2021 15:42:40 +0200 Subject: [PATCH] wip: USB disk space check, gps write values --- Agenso_api.py | 29 +++++++++++------ Agenso_thread.py | 80 +++++++++++++++++++++------------------------- camera.py | 2 +- cfg/detection.yaml | 2 +- data_types.py | 2 +- inference.py | 21 ++++-------- usb_Filestorage.py | 63 +++++++++++++++++++++++++++++------- 7 files changed, 119 insertions(+), 80 deletions(-) diff --git a/Agenso_api.py b/Agenso_api.py index 3eeed15..df73626 100644 --- a/Agenso_api.py +++ b/Agenso_api.py @@ -38,31 +38,42 @@ class AGENSO: #send the json to AGENSO def upload(self, json_data): + if self.token == None: + self.login() url = upload_url - return self.upload_helper(url, json_data) + (status_code, reply) = self.upload_helper(url, json_data) + if status_code != 200: + #self.token = None + return (False, reply) + return (True, reply) def bulkupload(self, json_data): + if self.token == None: + self.login() url = bulk_upload_url - return self.upload_helper(url, json_data) + (status_code, reply) = self.upload_helper(url, json_data) + if status_code != 200: + #self.token = None + return (False, reply) + return (True, reply) #handles both normal and bluk uploads - #Returns a False if the upload failed - #for normal upload: True if succesful - #for bulk [[true,200],[true,200]] (for each subrequest) + #for genral message: status 200 if succesful + #for bulk also: [[true,200],[true,200]] (for each subrequest) def upload_helper(self, url, json_data): - status = False params = {'api_token' : self.token} headers = {'Content-Type': 'application/json'} try: r = requests.post(url, params= params, headers = headers, data=json_data) if (r.status_code != 200): #Http Error, resturn status: False print("URL: %s, statuscode: %d, reply: %s" %(url, r.status_code, r.text)) + status = "error" else: status = json.loads(r.text) - return status + return (r.status_code, status) except Exception as e: - if type(e).__name__ == "ConnectionError": - return status + #if type(e).__name__ == "ConnectionError": + return (0, 0) #get data from AGENSO for a specific location/timestamp diff --git a/Agenso_thread.py b/Agenso_thread.py index d5d2f0d..f9622dc 100644 --- a/Agenso_thread.py +++ b/Agenso_thread.py @@ -1,67 +1,61 @@ +from threading import Thread +from time import sleep +from glob import glob +import os + + +from Agenso_api import * +from data_types import * + #API token, used to send the results #API_token_lock = Lock() #API_token_gl = AGENSO_login() -batchsize = 10 #number of results send in a batch to Agenso - -#API token is used in 2 threads -def get_API_token(API_token): - global API_token_lock, API_token_gl - with API_token_lock: - if API_token_gl == None: - API_token = AGENSO_login() - API_token_gl = API_token - else: - API_token = API_token_gl - return API_token +BATCHSIZE = 20 #number of results send in a batch to Agenso +SEND_INTERVAL = 300 #sleep 300secs after transmitting # only sends the data in processed folder -class send_data(Thread): - def __init__(self, opt): +class send_AGENSO_data(Thread): + def __init__(self, CONFIG): self._running = True - global API_token_lock, API_token_gl - self.tosend_folder = opt.tosend_folder - self.badreq_folder = opt.badreq_folder - with API_token_lock: - self.API_token = API_token_gl + self.agenso = AGENSO() + self.msg_folder = CONFIG['AGENSO-messages-folder'] + self.error_folder = CONFIG['AGENSO-error-folder'] + # Initialize the thread Thread.__init__(self) print("send_data thread started") def run(self): while self._running: - files_to_upload = glob(os.path.join(self.tosend_folder, '*.txt')) + files_to_upload = glob(os.path.join(self.msg_folder, '*.txt')) + if len(files_to_upload) == 0: - sleep(5) #if no files sleep 5 sec to save cpu + sleep(5) #if no files sleep 5 sec to save cpu*- bulk_data = bulk_data_class() - sending_files = [] - for file_prospath in files_to_upload : - data_obj = data_fromfile(file_prospath) + file_names = [] + + #send files to AGENSO in batches + for file_path in files_to_upload : + data_obj = data_fromfile(file_path) bulk_data.items.append(data_obj) - sending_files.append(file_prospath) + file_names.append(file_path) - if len(bulk_data.items) == batchsize: - self.send_bulk(bulk_data, sending_files) + if len(bulk_data.items) == BATCHSIZE: + self.send_bulk(bulk_data, file_names) bulk_data.items = [] - sending_files = [] - if len(bulk_data.items) > 0: - self.send_bulk(bulk_data, sending_files) + file_names = [] + #send last values + if len(bulk_data.items) > 0: + self.send_bulk(bulk_data, file_names) + sleep(SEND_INTERVAL) #secs def send_bulk(self, bulk_data, sending_files): - global API_token_lock, API_token_gl - if self.API_token == None: - self.API_token = get_API_token(self.API_token) - upl_status = AGENSO_bulkupload(self.API_token, data_tojson(bulk_data)) + upl_status = self.agenso.bulkupload(data_tojson(bulk_data)) - #move file to the send folder. - if upl_status == False: - with API_token_lock: - if API_token_gl == self.API_token: - API_token_gl = None - self.API_token = None - else: + if upl_status == True: for i in range(len(upl_status)):#check all the request responces if (upl_status[i][0] == True) and (upl_status[i][1] == 200): os.remove(sending_files[i]) @@ -69,9 +63,9 @@ class send_data(Thread): print("bad request send to Agenso:") print(upl_status[i][0]) print(bulk_data.items[i]) - if self.badreq_folder != None:#move to badreq folder + if self.error_folder != None:#move to badreq folder filename = sending_files[i].rsplit('/',1)[-1] - os.replace(sending_files[i], self.badreq_folder + "/" + filename) + os.replace(sending_files[i], self.error_folder + "/" + filename) else: os.remove(sending_files[i]) diff --git a/camera.py b/camera.py index c4b72ee..6692dae 100644 --- a/camera.py +++ b/camera.py @@ -44,7 +44,7 @@ class take_img(Thread): to_process_deque.append({'file_name': filename, 'img': img,'data': data_obj}) if self.save_GPS == True: gps_filename = timestamp.strftime("%y_%m_%d") #one file/day - self.usb.write_line(GPS_ringbuf.curitem_as_string(filename), gps_filename) + self.usb.write_line(GPS_ringbuf.curitem_as_string(timestamp), gps_filename) if self.save_IMGS == True: self.usb.save_img(img, timestamp.strftime("%y_%m_%dT%H"), filename) diff --git a/cfg/detection.yaml b/cfg/detection.yaml index ea6c269..40b88eb 100644 --- a/cfg/detection.yaml +++ b/cfg/detection.yaml @@ -15,7 +15,7 @@ logging: False GPS: True Camera: True AGENSO: True -save-GPS: False +save-GPS: True save-IMGS: False #folders diff --git a/data_types.py b/data_types.py index 65a955a..19d7f52 100644 --- a/data_types.py +++ b/data_types.py @@ -174,7 +174,7 @@ class GPS_Ringbuffer: item = self.data[self.point] GPS_string = str(item.lat) + "," + str(item.lng) + "," + str(item.latdif) + "," + str(item.lngdif) + "," GPS_string += str(item.hdop) + "," + str(item.satnum) + "," + str(item.gpsquality) + "," - GPS_string += item.gpstime + "," + item.rectime + "," + camera_timestamp +"\n" + GPS_string += item.gpstime + "," + item.rectime + "," + camera_timestamp.strftime("%y-%m-%dT%H:%M:%S.%f") +"\n" return GPS_string diff --git a/inference.py b/inference.py index 29dacaf..f1f2475 100644 --- a/inference.py +++ b/inference.py @@ -15,7 +15,7 @@ from glob import glob from global_vars import * from camera import * -from Agenso_api import * +from Agenso_thread import * from GPS_receiver import * ## libraries for the deep learning @@ -448,21 +448,18 @@ if __name__ == '__main__': else: raise Exception('selected DL mode not recognized!') + #grab results from DL thread proc_res_thread = post_processing(CONFIG) -# if CONFIG['AGENSO'] == False: - #AGENSO_thread + #AGENSO file send thread + if CONFIG['AGENSO'] == True: + AGENSO_thread = send_AGENSO_data(CONFIG) # Start the threads image_thread.start() dl_thread.start() proc_res_thread.start() - - # processing_thread.start() - # post_pros_thread.start() - - # if not opt.nn_only: - # sec_send_thread.start() + AGENSO_thread.start() #wait for key stroke, then quit program @@ -471,8 +468,4 @@ if __name__ == '__main__': image_thread.terminate() dl_thread.terminate() proc_res_thread.terminate() - -# processing_thread.terminate() - # post_pros_thread.terminate() - - # sec_send_thread.terminate() \ No newline at end of file + AGENSO_thread.terminate() \ No newline at end of file diff --git a/usb_Filestorage.py b/usb_Filestorage.py index 9110553..969c139 100644 --- a/usb_Filestorage.py +++ b/usb_Filestorage.py @@ -22,19 +22,40 @@ class USB_class: def __init__(self): self.UUID = None #UUID of the disk self.reconnect() + self.free_USBGBs = self.check_free_diskspace() + self.writenum = 0 #counter to avoid checking diskspace for every call + #need sudo rights for these folders... - def check_direxists(dir): + def check_direxists(self, dir): if dir != None: if not os.path.exists(dir): #os.makedirs(dir) #need sudo rights create_dircmd = ["sudo", "mkdir", dir] process = subprocess.Popen(create_dircmd, stderr = subprocess.PIPE) - if process.stderr.read().decode() != '': - print("create directory error: %s" %process.stderr.read().decode()) + error = process.stderr.read().decode() + if error != '': + print("create directory error: %s" %error) return False return True + #get free gb's on disk + def check_free_diskspace(self): + if self.UUID == None: + return 0 + else:#df | grep ^/dev/sdc1 + get_memory = "df | grep "+DISKPREFIX + self.UUID + process = subprocess.Popen(get_memory, stdout= subprocess.PIPE, stderr = subprocess.PIPE, shell = True) + error = process.stderr.read().decode() + if error != '': + print("get free diskspace error: %s" %error) + return 0 + output = process.stdout.read().decode() + #used_memory = output.split()[2] + avail_memory = output.split()[3] + return (float(avail_memory)/1000000) #gb's + + #try to find & connect to USB def reconnect(self): prev_UUID = self.UUID @@ -62,8 +83,9 @@ class USB_class: def find_disk(self): find_dev_UUID = ["ls","-l", "/dev/disk/by-uuid/"] process = subprocess.Popen(find_dev_UUID, stdout= subprocess.PIPE, stderr = subprocess.PIPE) - if process.stderr.read().decode() != '': - print("checkdisk error: %s" %process.stderr.read().decode()) + error = process.stderr.read().decode() + if error != '': + print("checkdisk error: %s" %error) output = process.stdout.read().decode() for line in output.splitlines(): @@ -81,8 +103,9 @@ class USB_class: # returns true if the disk is already mounted def check_mounted(self): process = subprocess.Popen(["df"], stdout= subprocess.PIPE, stderr = subprocess.PIPE) - if process.stderr.read().decode() != '': - print("mounting error: %s" %process.stderr.read().decode()) + error = process.stderr.read().decode() + if error != '': + print("mounting error: %s" %error) output = process.stdout.read().decode() check_UUID = DISKPREFIX + self.UUID @@ -104,8 +127,9 @@ class USB_class: def umount(self, mnt_name): Umount = ["sudo","umount", mnt_name] process = subprocess.Popen(Umount, stderr = subprocess.PIPE) - if process.stderr.read().decode() != '': - print("umount error: " + process.stderr.read().decode()) + error = process.stderr.read().decode() + if error != '': + print("umount error: " + error) return False return True @@ -115,8 +139,9 @@ class USB_class: #vfat for fat 16 & fat32 drives, default user id =1000 Mount = ["sudo","mount", "-t", "vfat", (DISKPREFIX + self.UUID), MOUNT_DIR,"-o", "rw,uid=1000,gid=1000,umask=133,dmask=022"] process = subprocess.Popen(Mount, stderr = subprocess.PIPE) - if process.stderr.read().decode() != '': - print("mount error: " + process.stderr.read().decode()) + error = process.stderr.read().decode() + if error != '': + print("mount error: " + error) return False return True @@ -128,7 +153,15 @@ class USB_class: if not self.reconnect(): return file_dir = os.path.join(MOUNT_DIR, IMG_folder, subfolder) + self.writenum = self.writenum + 1 try: + #avoid overflowing the disk + if self.writenum %100 == 0: + self.free_USBGBs = self.check_free_diskspace() + self.writenum = 0 + if self.free_USBGBs < 0.1: + return + #create subfolder if it not there yet & save image if not self.check_direxists(file_dir): raise Exception("disk not connected") @@ -140,12 +173,20 @@ class USB_class: print("USB write img error: %s" %str(e)) return + #write a line to the GPS logfile def write_line(self, data_string, filename): if self.UUID == None: if not self.reconnect(): return + self.writenum = self.writenum + 1 try: + #avoid overflowing the disk + if self.writenum %100 == 0: + self.free_USBGBs = self.check_free_diskspace() + self.writenum = 0 + if self.free_USBGBs < 0.1: + return with open(os.path.join(MOUNT_DIR, GPS_folder, (filename + ".txt")), "a") as file: file.write(data_string) except Exception as e: -- GitLab