Skip to content
Snippets Groups Projects
Commit 0762482b authored by Daalen, Tim van's avatar Daalen, Tim van
Browse files

wip: USB disk space check, gps write values

parent 7030688d
No related branches found
No related tags found
No related merge requests found
......@@ -38,31 +38,42 @@ class AGENSO:
#send the json to AGENSO
def upload(self, json_data):
if self.token == None:
self.login()
url = upload_url
return self.upload_helper(url, json_data)
(status_code, reply) = self.upload_helper(url, json_data)
if status_code != 200:
#self.token = None
return (False, reply)
return (True, reply)
def bulkupload(self, json_data):
if self.token == None:
self.login()
url = bulk_upload_url
return self.upload_helper(url, json_data)
(status_code, reply) = self.upload_helper(url, json_data)
if status_code != 200:
#self.token = None
return (False, reply)
return (True, reply)
#handles both normal and bluk uploads
#Returns a False if the upload failed
#for normal upload: True if succesful
#for bulk [[true,200],[true,200]] (for each subrequest)
#for genral message: status 200 if succesful
#for bulk also: [[true,200],[true,200]] (for each subrequest)
def upload_helper(self, url, json_data):
status = False
params = {'api_token' : self.token}
headers = {'Content-Type': 'application/json'}
try:
r = requests.post(url, params= params, headers = headers, data=json_data)
if (r.status_code != 200): #Http Error, resturn status: False
print("URL: %s, statuscode: %d, reply: %s" %(url, r.status_code, r.text))
status = "error"
else:
status = json.loads(r.text)
return status
return (r.status_code, status)
except Exception as e:
if type(e).__name__ == "ConnectionError":
return status
#if type(e).__name__ == "ConnectionError":
return (0, 0)
#get data from AGENSO for a specific location/timestamp
......
from threading import Thread
from time import sleep
from glob import glob
import os
from Agenso_api import *
from data_types import *
#API token, used to send the results
#API_token_lock = Lock()
#API_token_gl = AGENSO_login()
batchsize = 10 #number of results send in a batch to Agenso
#API token is used in 2 threads
def get_API_token(API_token):
global API_token_lock, API_token_gl
with API_token_lock:
if API_token_gl == None:
API_token = AGENSO_login()
API_token_gl = API_token
else:
API_token = API_token_gl
return API_token
BATCHSIZE = 20 #number of results send in a batch to Agenso
SEND_INTERVAL = 300 #sleep 300secs after transmitting
# only sends the data in processed folder
class send_data(Thread):
def __init__(self, opt):
class send_AGENSO_data(Thread):
def __init__(self, CONFIG):
self._running = True
global API_token_lock, API_token_gl
self.tosend_folder = opt.tosend_folder
self.badreq_folder = opt.badreq_folder
with API_token_lock:
self.API_token = API_token_gl
self.agenso = AGENSO()
self.msg_folder = CONFIG['AGENSO-messages-folder']
self.error_folder = CONFIG['AGENSO-error-folder']
# Initialize the thread
Thread.__init__(self)
print("send_data thread started")
def run(self):
while self._running:
files_to_upload = glob(os.path.join(self.tosend_folder, '*.txt'))
files_to_upload = glob(os.path.join(self.msg_folder, '*.txt'))
if len(files_to_upload) == 0:
sleep(5) #if no files sleep 5 sec to save cpu
sleep(5) #if no files sleep 5 sec to save cpu*-
bulk_data = bulk_data_class()
sending_files = []
for file_prospath in files_to_upload :
data_obj = data_fromfile(file_prospath)
file_names = []
#send files to AGENSO in batches
for file_path in files_to_upload :
data_obj = data_fromfile(file_path)
bulk_data.items.append(data_obj)
sending_files.append(file_prospath)
file_names.append(file_path)
if len(bulk_data.items) == batchsize:
self.send_bulk(bulk_data, sending_files)
if len(bulk_data.items) == BATCHSIZE:
self.send_bulk(bulk_data, file_names)
bulk_data.items = []
sending_files = []
if len(bulk_data.items) > 0:
self.send_bulk(bulk_data, sending_files)
file_names = []
#send last values
if len(bulk_data.items) > 0:
self.send_bulk(bulk_data, file_names)
sleep(SEND_INTERVAL) #secs
def send_bulk(self, bulk_data, sending_files):
global API_token_lock, API_token_gl
if self.API_token == None:
self.API_token = get_API_token(self.API_token)
upl_status = AGENSO_bulkupload(self.API_token, data_tojson(bulk_data))
upl_status = self.agenso.bulkupload(data_tojson(bulk_data))
#move file to the send folder.
if upl_status == False:
with API_token_lock:
if API_token_gl == self.API_token:
API_token_gl = None
self.API_token = None
else:
if upl_status == True:
for i in range(len(upl_status)):#check all the request responces
if (upl_status[i][0] == True) and (upl_status[i][1] == 200):
os.remove(sending_files[i])
......@@ -69,9 +63,9 @@ class send_data(Thread):
print("bad request send to Agenso:")
print(upl_status[i][0])
print(bulk_data.items[i])
if self.badreq_folder != None:#move to badreq folder
if self.error_folder != None:#move to badreq folder
filename = sending_files[i].rsplit('/',1)[-1]
os.replace(sending_files[i], self.badreq_folder + "/" + filename)
os.replace(sending_files[i], self.error_folder + "/" + filename)
else:
os.remove(sending_files[i])
......
......@@ -44,7 +44,7 @@ class take_img(Thread):
to_process_deque.append({'file_name': filename, 'img': img,'data': data_obj})
if self.save_GPS == True:
gps_filename = timestamp.strftime("%y_%m_%d") #one file/day
self.usb.write_line(GPS_ringbuf.curitem_as_string(filename), gps_filename)
self.usb.write_line(GPS_ringbuf.curitem_as_string(timestamp), gps_filename)
if self.save_IMGS == True:
self.usb.save_img(img, timestamp.strftime("%y_%m_%dT%H"), filename)
......
......@@ -15,7 +15,7 @@ logging: False
GPS: True
Camera: True
AGENSO: True
save-GPS: False
save-GPS: True
save-IMGS: False
#folders
......
......@@ -174,7 +174,7 @@ class GPS_Ringbuffer:
item = self.data[self.point]
GPS_string = str(item.lat) + "," + str(item.lng) + "," + str(item.latdif) + "," + str(item.lngdif) + ","
GPS_string += str(item.hdop) + "," + str(item.satnum) + "," + str(item.gpsquality) + ","
GPS_string += item.gpstime + "," + item.rectime + "," + camera_timestamp +"\n"
GPS_string += item.gpstime + "," + item.rectime + "," + camera_timestamp.strftime("%y-%m-%dT%H:%M:%S.%f") +"\n"
return GPS_string
......
......@@ -15,7 +15,7 @@ from glob import glob
from global_vars import *
from camera import *
from Agenso_api import *
from Agenso_thread import *
from GPS_receiver import *
## libraries for the deep learning
......@@ -448,21 +448,18 @@ if __name__ == '__main__':
else:
raise Exception('selected DL mode not recognized!')
#grab results from DL thread
proc_res_thread = post_processing(CONFIG)
# if CONFIG['AGENSO'] == False:
#AGENSO_thread
#AGENSO file send thread
if CONFIG['AGENSO'] == True:
AGENSO_thread = send_AGENSO_data(CONFIG)
# Start the threads
image_thread.start()
dl_thread.start()
proc_res_thread.start()
# processing_thread.start()
# post_pros_thread.start()
# if not opt.nn_only:
# sec_send_thread.start()
AGENSO_thread.start()
#wait for key stroke, then quit program
......@@ -471,8 +468,4 @@ if __name__ == '__main__':
image_thread.terminate()
dl_thread.terminate()
proc_res_thread.terminate()
# processing_thread.terminate()
# post_pros_thread.terminate()
# sec_send_thread.terminate()
\ No newline at end of file
AGENSO_thread.terminate()
\ No newline at end of file
......@@ -22,19 +22,40 @@ class USB_class:
def __init__(self):
self.UUID = None #UUID of the disk
self.reconnect()
self.free_USBGBs = self.check_free_diskspace()
self.writenum = 0 #counter to avoid checking diskspace for every call
#need sudo rights for these folders...
def check_direxists(dir):
def check_direxists(self, dir):
if dir != None:
if not os.path.exists(dir):
#os.makedirs(dir) #need sudo rights
create_dircmd = ["sudo", "mkdir", dir]
process = subprocess.Popen(create_dircmd, stderr = subprocess.PIPE)
if process.stderr.read().decode() != '':
print("create directory error: %s" %process.stderr.read().decode())
error = process.stderr.read().decode()
if error != '':
print("create directory error: %s" %error)
return False
return True
#get free gb's on disk
def check_free_diskspace(self):
if self.UUID == None:
return 0
else:#df | grep ^/dev/sdc1
get_memory = "df | grep "+DISKPREFIX + self.UUID
process = subprocess.Popen(get_memory, stdout= subprocess.PIPE, stderr = subprocess.PIPE, shell = True)
error = process.stderr.read().decode()
if error != '':
print("get free diskspace error: %s" %error)
return 0
output = process.stdout.read().decode()
#used_memory = output.split()[2]
avail_memory = output.split()[3]
return (float(avail_memory)/1000000) #gb's
#try to find & connect to USB
def reconnect(self):
prev_UUID = self.UUID
......@@ -62,8 +83,9 @@ class USB_class:
def find_disk(self):
find_dev_UUID = ["ls","-l", "/dev/disk/by-uuid/"]
process = subprocess.Popen(find_dev_UUID, stdout= subprocess.PIPE, stderr = subprocess.PIPE)
if process.stderr.read().decode() != '':
print("checkdisk error: %s" %process.stderr.read().decode())
error = process.stderr.read().decode()
if error != '':
print("checkdisk error: %s" %error)
output = process.stdout.read().decode()
for line in output.splitlines():
......@@ -81,8 +103,9 @@ class USB_class:
# returns true if the disk is already mounted
def check_mounted(self):
process = subprocess.Popen(["df"], stdout= subprocess.PIPE, stderr = subprocess.PIPE)
if process.stderr.read().decode() != '':
print("mounting error: %s" %process.stderr.read().decode())
error = process.stderr.read().decode()
if error != '':
print("mounting error: %s" %error)
output = process.stdout.read().decode()
check_UUID = DISKPREFIX + self.UUID
......@@ -104,8 +127,9 @@ class USB_class:
def umount(self, mnt_name):
Umount = ["sudo","umount", mnt_name]
process = subprocess.Popen(Umount, stderr = subprocess.PIPE)
if process.stderr.read().decode() != '':
print("umount error: " + process.stderr.read().decode())
error = process.stderr.read().decode()
if error != '':
print("umount error: " + error)
return False
return True
......@@ -115,8 +139,9 @@ class USB_class:
#vfat for fat 16 & fat32 drives, default user id =1000
Mount = ["sudo","mount", "-t", "vfat", (DISKPREFIX + self.UUID), MOUNT_DIR,"-o", "rw,uid=1000,gid=1000,umask=133,dmask=022"]
process = subprocess.Popen(Mount, stderr = subprocess.PIPE)
if process.stderr.read().decode() != '':
print("mount error: " + process.stderr.read().decode())
error = process.stderr.read().decode()
if error != '':
print("mount error: " + error)
return False
return True
......@@ -128,7 +153,15 @@ class USB_class:
if not self.reconnect():
return
file_dir = os.path.join(MOUNT_DIR, IMG_folder, subfolder)
self.writenum = self.writenum + 1
try:
#avoid overflowing the disk
if self.writenum %100 == 0:
self.free_USBGBs = self.check_free_diskspace()
self.writenum = 0
if self.free_USBGBs < 0.1:
return
#create subfolder if it not there yet & save image
if not self.check_direxists(file_dir):
raise Exception("disk not connected")
......@@ -140,12 +173,20 @@ class USB_class:
print("USB write img error: %s" %str(e))
return
#write a line to the GPS logfile
def write_line(self, data_string, filename):
if self.UUID == None:
if not self.reconnect():
return
self.writenum = self.writenum + 1
try:
#avoid overflowing the disk
if self.writenum %100 == 0:
self.free_USBGBs = self.check_free_diskspace()
self.writenum = 0
if self.free_USBGBs < 0.1:
return
with open(os.path.join(MOUNT_DIR, GPS_folder, (filename + ".txt")), "a") as file:
file.write(data_string)
except Exception as e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment