From a4fc49970edf8580a97b43e6f33d0e618d392eaf Mon Sep 17 00:00:00 2001 From: Adriaens <ines.adriaens@wur.nl> Date: Mon, 29 Aug 2022 16:34:58 +0200 Subject: [PATCH] read data adjust timestamps --- datapreparation/ines/barn_areas.py | 238 +++++++++++ datapreparation/ines/datapreparation.py | 202 +++++++-- datapreparation/ines/filter_data.py | 311 ++++++++++++++ datapreparation/ines/read_data_sewio.py | 537 ++++++++++++++++++++++++ datapreparation/ines/requirements.txt | 8 + datapreparation/ines/test.txt | 1 - 6 files changed, 1272 insertions(+), 25 deletions(-) create mode 100644 datapreparation/ines/barn_areas.py create mode 100644 datapreparation/ines/filter_data.py create mode 100644 datapreparation/ines/read_data_sewio.py create mode 100644 datapreparation/ines/requirements.txt delete mode 100644 datapreparation/ines/test.txt diff --git a/datapreparation/ines/barn_areas.py b/datapreparation/ines/barn_areas.py new file mode 100644 index 0000000..ac6d5fd --- /dev/null +++ b/datapreparation/ines/barn_areas.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +""" +Created on Thu May 19 10:36:21 2022 + +@author: adria036 +""" + + +#%% define functions for areas in the barn + +import pandas as pd +import numpy as np + +# at the feeding rack +def feeding_rack(X,y): + """ + A cow is considered to be at the feeding rack when her y-position is + larger than -4.6m + """ + + # find measurements where cow is at the feeding rack + feedrackx = X.loc[(y >= -4.6)] + feedracky = y.loc[(y >= -4.6)] + + # combine in dataframe + feed = pd.DataFrame({'X' : feedrackx, + 'y' : feedracky}) + + # calculate percentages + pfeed = len(feed)/len(y)*100 + + return feed, pfeed + +# at the cubicles next to the exit +def cubicles_A(X,y): + """ + A cow is considered to be at the cublicles next to the exits and concentrate + feeder when her head (the tag) is at least for 1.1m in the cublicle area. + Concretely, this means: + - y value <= -16 + - x value between 2.2 and 11 (barn 72) + or between 23.7 and 32 (barn 70) + + """ + # find measurements where cow is in cubicles A + cub_ax = X.loc[(y <= -16) & \ + (((X > 2.2) & (X <= 11)) | \ + ((X >= 23.7) & (X <= 32)))] + cub_ay = y.loc[(y <= -16) & \ + (((X >= 2.2) & (X <= 11)) | \ + ((X >= 23.7) & (X <= 32)))] + + # combine in dataframe + cub_a = pd.DataFrame({'X' : cub_ax, + 'y' : cub_ay}) + + # calculate percentages + pcub_a = len(cub_a)/len(y)*100 + + return cub_a,pcub_a + +def cubicles_B(X,y): + """ + A cow is considered to be at the cublicles in the middle of the barn and + closest to the exits and concentrate feeder when her head (the tag) is at + least for 1.1m in the cublicle area. + Concretely, this means: + - y value >= -11.4 and <= -10 + - x value between 0 and 5.7 (barn 72) + or between 21 and 27 (barn 70) + NOTE: on plan, not same size /length of cubicle area + """ + + # find measurements where cow is in cubicles B + cub_bx = X.loc[((y >= -11.4) & (y <= -10 )) & \ + (((X >= 0) & (X <= 5.7)) | \ + ((X >= 21) & (X <= 27)))] + cub_by = y.loc[((y >= -11.4) & (y <= -10 )) & \ + (((X >= 0) & (X <= 5.7)) | \ + ((X >= 21) & (X <= 27)))] + # combine in dataframe + cub_b = pd.DataFrame({'X' : cub_bx, + 'y' : cub_by}) + + # calculate percentages + pcub_b = len(cub_b)/len(y)*100 + + return cub_b,pcub_b + +def cubicles_C(X,y): + """ + A cow is considered to be at the cublicles in the middle of the barn and + farthest from the exits (closest to feed rack) when her head (the tag) is at + least for 1.1m in the cublicle area. + Concretely, this means: + - y value >= -10 and <= -8.6 + - x value between 0 and 5.7 (barn 72) + or between 21 and 27 (barn 70) + NOTE: on plan, not same size /length of cubicle area + """ + + # find measurements where cow is in cubicles B + cub_cx = X.loc[((y > -10) & (y <= -8.6 )) & \ + (((X >= 0) & (X <= 5.7)) | \ + ((X >= 21) & (X <= 27)))] + cub_cy = y.loc[((y > -10) & (y <= -8.6 )) & \ + (((X >= 0) & (X <= 5.7)) | \ + ((X >= 21) & (X <= 27)))] + # combine in dataframe + cub_c = pd.DataFrame({'X' : cub_cx, + 'y' : cub_cy}) + + # calculate percentages + pcub_c = len(cub_c)/len(y)*100 + + return cub_c,pcub_c + +def cubicles_all(X,y): + """ + Combined of lying behaviour in area A, B and C + """ + # in area A + cub_a, pcub_a = cubicles_A(X,y) + # in area B + cub_b, pcub_b = cubicles_B(X,y) + # in area C + cub_c, pcub_c = cubicles_C(X,y) + + # combine percentages + pcub = pcub_a+pcub_b+pcub_c + + # combine in A, B, C + cub = pd.concat([cub_a,cub_b, cub_c]) + + return cub,pcub + +def drink_trough(X,y): + """ + A cow is considered to be at the cublicles in the middle of the barn and + farthest from the exits (closest to feed rack) when her head (the tag) is at + least for 1.1m in the cublicle area. + Concretely, this means: + - y value >= -10 and <= -8.6 + - x value between 0 and 5.7 (barn 72) + or between 21 and 27 (barn 70) + NOTE: on plan, not same size /length of cubicle area + """ + # find measurements where cow is near drinking trough + drinkx = X.loc[((y >= -11) & (y <= -8.9)) & \ + (((X >= 8.8) & (X <= 10)) | \ + ((X >= 30.1) & (X <= 32)))] + drinky = y.loc[((y >= -11) & (y <= -8.9)) & \ + (((X >= 8.8) & (X <= 10)) | \ + ((X >= 30.1) & (X <= 32)))] + + # combine in dataframe + drink = pd.DataFrame({'X' : drinkx, + 'y' : drinky}) + + # calculate percentages + pdrink = len(drink)/len(y)*100 + + return drink, pdrink + +def conc_feeder(X,y): + """ + A cow is considered to be at the concentrate feeder when her head (the tag) + is at least for 1m in the feeder area. + Concretely, this means: + - y value <= -16 + - x value between 1 and 2.2 (barn 72) + or between 22.3 and 23.7 (barn 70) + + """ + # find measurements where cow is in concentrate feeder + concx = X.loc[(y <= -16) & \ + (((X >= 1) & (X < 2.2)) | \ + ((X >= 22.3) & (X <= 23.7)))] + concy = y.loc[(y <= -16) & \ + (((X >= 1) & (X < 2.2)) | \ + ((X >= 22.3) & (X <= 23.7)))] + + # combine in dataframe + conc = pd.DataFrame({'X' : concx, + 'y' : concy}) + + # calculate percentages + pconc = len(conc)/len(y)*100 + + return conc,pconc + + +def assign_behaviour(X,y): + """ + function to combine all behaviors/locations in 1 + 1 = cublicle_A + 2 = cublicle_B + 3 = cublicle_C + 4 = feeding_rack + 5 = drinking_trough + 6 = concentrate_feeder + 0 = no specific area assigned + """ + + # calculate time budgets and add the correct number + cub_a, pcub_a = cubicles_A(X,y) # in area A + cub_b, pcub_b = cubicles_B(X,y) # in area B + cub_c, pcub_c = cubicles_C(X,y) # in area C + drink, pdrink = drink_trough(X,y) # at drinking trough + feed, pfeed = feeding_rack(X, y) # at feeding rack + conc, pconc = conc_feeder(X,y) # at concentrate feeder + + # add numbers + cub_a["no"] = 1 + cub_b["no"] = 2 + cub_c["no"] = 3 + feed["no"] = 4 + drink["no"] = 5 + conc["no"] = 6 + + # combine as one dataframe + data = pd.concat([cub_a,cub_b, cub_c,feed,drink,conc]) + pcombi = pcub_a+pcub_b+pcub_c+pfeed+pdrink+pconc + + # TODO: overview little table of zones + totals + + # indices are retained, use this to merge & find "vain" behaviour + new = pd.DataFrame({'X' : X, + 'y' : y, + 'no' : np.zeros((len(X),), dtype=int)}) + new = new.loc[~new.index.isin(data.index),:] + + # combine in 1 data array + data = pd.concat([data,new]) + data = data.sort_index() + + + return data, pcombi diff --git a/datapreparation/ines/datapreparation.py b/datapreparation/ines/datapreparation.py index 399d6e5..608c173 100644 --- a/datapreparation/ines/datapreparation.py +++ b/datapreparation/ines/datapreparation.py @@ -1,50 +1,204 @@ # -*- coding: utf-8 -*- """ -Created on Thu Jun 16 13:58:40 2022 +Created on Mon Aug 29, 2022 @author: adria036 ------------------------------------------------------------------------------- -Data preparation script - uwb time as in manuscript lying behaviour +Data preparation script - Collected via NLAS script +Dates : between April to August, 2022 +Location: at Dairy Campus, Leeuwarden +Sensors: UWB positioning (spatial data: X, Y) + Accelerations (X, Y, Z) +1) Activity measured by accelerometers in 3 dimensions +2) ------------------------------------------------------------------------------- +Remarks: + - corrected for time-desynchronization with UTC + - activity based on UWB reference with thresholds: + ° distance moved + ° speed + ==> into categories of activity (very to little ~ behaviours) + - activity measured by accelerometers in 3 dimensions + + +------------------------------------------------------------------------------- +file paths - to dataopslag on the WUR W: drive """ #%% import packages import os +from datetime import date, timedelta import pandas as pd +import numpy as np +import copy +#import matplotlib.pyplot as plt +# from filter_data import filter_barn_edges, filter_interp +import read_data_sewio +# import barn_areas +# import sqlite3 + + +#%% set filepaths and dates + +# set path to data X,y +path_data = os.path.join( + "W:","ASG","WLR_Dataopslag","DairyCampus","3406_Nlas","raw","dc_sewio" + ) + +# set path to sql data folder +path_sql = os.path.join( + "W:","ASG","WLR_Dataopslag","DairyCampus","3406_Nlas","raw","dc_total" + ) + +# set path to results folder +path_res = os.path.join("C:","/Users","adria036","OneDrive - Wageningen University & Research", + "iAdriaens_doc","Projects","iAdriaens","bait","scripts", + "bait","data","nlas_ines/" + ) + +# set file name for cowid information +fn = "\Copy_of_Sewio_tags.xlsx" + +# set start and end dates +startdate = date(2022,7,31) +enddate = date(2022,7,31) + +# set parameters for the imputation and interpolation +win_med = 31 +gap_thres = 180 + +# # clear workspace +# del fn, path_data + -#%% set paths +#%% data selection +#TODO check correction for time in read_data_sewio -path_data = os.path.join("C:", - "/Users", - "adria036", - "OneDrive - Wageningen University & Research", - "iAdriaens_doc", - "Projects", - "eEllen", - "B4F_indTracking_cattle", - "B4F_data/") +t_interval = enddate - startdate +for dd in range(t_interval.days+1): + dd = timedelta(dd) + print(startdate+dd) + + # --------------------------read & select data----------------------------- + data = read_all_data(startdate+dd,startdate+dd,path_os) + + # read id/tag data + tags = read_ids(path, fn) -# set list of data -fn = [f for f in os.listdir(path_data) if os.path.isfile(os.path.join(path_data,f)) \ - and ".txt" in f \ - and "cow_" in f] + # combine it with data + data = combine_data_id(data,tags) + data = data[["cowid","barn","date","at","feed_reference","title","X","y","alias"]] + + # unique 'seconds' in the dataset + #data["day"] = data["at"].dt.day - min(data["at"].dt.day) + data["hour"] = data["at"].dt.hour + data["minute"] = data["at"].dt.minute + data["second"] = data["at"].dt.second + data["relsec"] = data["hour"]*3600 \ + + data["minute"]*60 \ + + data["second"] + nsec = data["relsec"] + cowids = data["cowid"] + df = pd.concat([cowids,nsec],axis = 1).drop_duplicates() # gives the indices of unique first + del df["relsec"], df["cowid"] -# load data in pd -data = pd.DataFrame([]) -for f in fn: - df = pd.read_csv(path_data+f) - data = pd.concat([data,df]) + # innerjoin for selection step - unique seconds in the dataset + data = pd.concat([data,df],axis = 1, join = 'inner') + data = data.sort_values(by = ["cowid","at"]).reset_index(drop=1) -# clear workspace -del fn, path_data, f, df + # unfiltered data - one per second or less, all cows included + # calculate gaps based on numeric time relative to start of dataset + data["gap"] = data["relsec"].diff() + data.loc[data["gap"] < 0,"gap"] = np.nan # set to nan when different cowid + + #--------------------------edit barn edges--------------------------------- + # set data outside edges to edges x < 0 and < 10.6 or x < 21 and x > 32 + data.loc[(data["X"] < 15),"X"] = filter_barn_edges( \ + copy.copy(data.loc[(data["X"] < 15),"X"]),0,10.6) #barn 72 + data.loc[(data["X"] >= 15),"X"] = filter_barn_edges( \ + copy.copy(data.loc[(data["X"] >= 15),"X"]),21,32) # barn 70 + # set data outside edges to edges y < -18 or y > -2.5 + data["y"] = filter_barn_edges(copy.copy(data["y"]),-18,-2.5) + + #data["X"].hist() + #data["y"].hist() + + # delete rows without cowid + data = data.loc[data["cowid"]>0,:] + + #-----------------------filter & interpolate per cow----------------------- + # filter the (X,y) time series with a median filter and interpolate + + # cows in the barn + cows = data["cowid"].drop_duplicates().sort_values().reset_index(drop=1) + cows = pd.DataFrame(cows) + + # loop over all data per cowid and select (x,y,t) data to filter + result = pd.DataFrame({"cowid": [], + "date": [], + "t" : [], + "xfilt" : [], + "yfilt" : [], + "area" : []}) + for i in range(0,len(cows)): + # select data + cow_t = data.loc[(data["cowid"] == cows["cowid"][i]),"relsec"] + cow_x = data.loc[(data["cowid"] == cows["cowid"][i]),"X"] + cow_y = data.loc[(data["cowid"] == cows["cowid"][i]),"y"] + + # filter data + if len(cow_t) > 10: + # add edges if t doesn't end at second 86399 of the day + if cow_t.max() < 86399: + cow_t = pd.concat([cow_t,pd.Series(86399)], axis=0) + cow_x = pd.concat([cow_x,pd.Series(np.nan)], axis=0) + cow_y = pd.concat([cow_y,pd.Series(np.nan)], axis=0) + + # add edges if t doesn't start with 0th second of the day + if cow_t.iloc[0]/86400 - float(cow_t.iloc[0]//86400) > 0.00001: + cow_t = pd.concat([pd.Series(cow_t.iloc[0]//86400*86400),cow_t], axis=0) + cow_x = pd.concat([pd.Series(np.nan),cow_x], axis=0) + cow_y = pd.concat([pd.Series(np.nan),cow_y], axis=0) + + # TODO: add accelerations + + # filter + df,x,y = filter_interp(cow_t,cow_x,cow_y,win_med,gap_thres) + df = df.reset_index(drop=1) + + # asign barn areas + data_area, parea = barn_areas.assign_behaviour(x,y) + data_area = data_area[~data_area.index.duplicated(keep='first')] + + # merge with df + df["area"] = data_area["no"] + + # select columns and add cowid + df["cowid"] = cows["cowid"][i] + df["date"] = str(startdate+dd) + df = df[["cowid","date","t","xfilt","yfilt","area"]] + else: + df = pd.DataFrame({"cowid":cows["cowid"][i], + "date":str(startdate+dd), + "t" : pd.Series(np.arange(0,86400,1)), + "xfilt" : pd.Series(np.arange(0,86400,1)*np.nan), + "yfilt" : pd.Series(np.arange(0,86400,1)*np.nan), + "area" : pd.Series(np.arange(0,86400,1)*np.nan)}) + + + # concat dataframes all cows together + result = pd.concat([result,df]) -#%% calculate and select data + # save results + datestr = str(startdate+dd) + datestr = datestr.replace("-","") + fns = datestr + '_bec3.txt' + result.to_csv(path_res + fns, index = False) diff --git a/datapreparation/ines/filter_data.py b/datapreparation/ines/filter_data.py new file mode 100644 index 0000000..d2e8386 --- /dev/null +++ b/datapreparation/ines/filter_data.py @@ -0,0 +1,311 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon May 23 13:22:02 2022 + +@author: adria036 +""" + + + + +#import os +import pandas as pd +import numpy as np +#import copy +#import matplotlib.pyplot as plt +from scipy.signal import medfilt +import scipy + +def non_uniform_savgol(x, y, window, polynom): + """ + Applies a Savitzky-Golay filter to y with non-uniform spacing + as defined in x + + This is based on https://dsp.stackexchange.com/questions/1676/savitzky-golay-smoothing-filter-for-not-equally-spaced-data + The borders are interpolated like scipy.signal.savgol_filter would do + + Parameters + ---------- + x : array_like + List of floats representing the x values of the data + y : array_like + List of floats representing the y values. Must have same length + as x + window : int (odd) + Window length of datapoints. Must be odd and smaller than x + polynom : int + The order of polynom used. Must be smaller than the window size + + Returns + ------- + np.array of float + The smoothed y values + """ + if len(x) != len(y): + raise ValueError('"x" and "y" must be of the same size') + + if len(x) < window: + raise ValueError('The data size must be larger than the window size') + + if type(window) is not int: + raise TypeError('"window" must be an integer') + + if window % 2 == 0: + raise ValueError('The "window" must be an odd integer') + + if type(polynom) is not int: + raise TypeError('"polynom" must be an integer') + + if polynom >= window: + raise ValueError('"polynom" must be less than "window"') + + half_window = window // 2 + polynom += 1 + + # Initialize variables + A = np.empty((window, polynom)) # Matrix + tA = np.empty((polynom, window)) # Transposed matrix + t = np.empty(window) # Local x variables + y_smoothed = np.full(len(y), np.nan) + + # Start smoothing + N = 0 + for i in range(half_window, len(x) - half_window, 1): + # print(i) + N += 1 + + # Center a window of x values on x[i] + for j in range(0, window, 1): + #print(j) + t[j] = x[i + j - half_window] - x[i] + + #NEW: t2 with stepsize 1 + t2 = list(range(x[i - half_window] - x[i], x[i + (window-1) - half_window] - x[i]+1)) + t2 = np.asarray(t2,dtype = float) + + #NEW: Initialize variables based on variable window + A2 = np.empty((len(t2), polynom)) # Matrix + # tA2 = np.empty((polynom, len(t2))) # Transposed matrix + + # Create the initial matrix A and its transposed form tA + for j in range(0, window, 1): + r = 1.0 + for k in range(0, polynom, 1): + A[j, k] = r + tA[k, j] = r + r *= t[j] + + for j in range(0,len(t2),1): + # print(j,t2[j]) + r = 1.0 + for k in range(0, polynom, 1): + A2[j, k] = r + # tA2[k, j] = r + r *= t2[j] + + # Multiply the two matrices = (A^T*A) + tAA = np.matmul(tA, A) + + # Invert the product of the matrices =(A^T*A)-1 + tAA = np.linalg.inv(tAA) + + # Calculate the pseudoinverse of the design matrix = (A^T*A)-1*A^T + coeffs = np.matmul(tAA, tA) + + #NEW: Calculate c by matrix multiplying it with the original known y values + #NEW: in the window of interest + c = np.matmul(coeffs,y[i-half_window:i+window-1-half_window+1]) + + #NEW: c[0] = is the smoothed value for i + #NEW: we can calculate the interpolated values using y = A2*c + y_interp = np.matmul(A2,c) + + #NEW: rearrange to return - select the values needed for the smoothing + #NEW: based on the frames that we have sampled + if N == 1: + new_data = pd.DataFrame(t2,columns = ["tstep"]) + new_data["frameno"] = new_data["tstep"] + x[i] + new_data["y_smooth"+str(N)] = y_interp + new_data = new_data.drop("tstep",axis = 1) + else: + sm_data = pd.DataFrame(t2,columns = ["tstep"]) + sm_data["frameno"] = sm_data["tstep"] + x[i] + sm_data["y_smooth"+str(N)] = y_interp + sm_data = sm_data.drop("tstep", axis = 1) + new_data = new_data.merge(sm_data,how='outer',on='frameno') + + # Calculate c0 which is also the y value for y[i] based on available only + y_smoothed[i] = 0 + for j in range(0, window, 1): + y_smoothed[i] += coeffs[0, j] * y[i + j - half_window] + + # If at the end or beginning, store all coefficients for the polynom + if i == half_window: + first_coeffs = np.zeros(polynom) + for j in range(0, window, 1): + for k in range(polynom): + first_coeffs[k] += coeffs[k, j] * y[j] + elif i == len(x) - half_window - 1: + last_coeffs = np.zeros(polynom) + for j in range(0, window, 1): + for k in range(polynom): + last_coeffs[k] += coeffs[k, j] * y[len(y) - window + j] + + # Interpolate the result at the left border + for i in range(0, half_window, 1): + y_smoothed[i] = 0 + x_i = 1 + for j in range(0, polynom, 1): + y_smoothed[i] += first_coeffs[j] * x_i + x_i *= x[i] - x[half_window] + + # Interpolate the result at the right border + for i in range(len(x) - half_window, len(x), 1): + y_smoothed[i] = 0 + x_i = 1 + for j in range(0, polynom, 1): + y_smoothed[i] += last_coeffs[j] * x_i + x_i *= x[i] - x[half_window - 1] + + #NEW: compute interpolated mean to sample when not available + new_data["mean"] = new_data.iloc[:,1:].mean(axis=1) + y_all = new_data[["frameno","mean"]] + + return y_smoothed, y_all + +def filter_barn_edges(ts,edge1,edge2): + """ + Filters the data according to barn edges given by edge1 and edge2 + Method: puts data outside edges to nan + if ts < edge1 : ts = nan + if ts > edge2 : ts = nan + NOTE: in a later stage, we can decide to put ts = closest edge + + Parameters + ---------- + ts : TYPE = Pandas series (time series) + DESCRIPTION. + edge1 : TYPE + DESCRIPTION. + edge2 : TYPE + DESCRIPTION. + + Returns + ------- + ts_filt : TYPE = Pandas series + Original series filtered for measurements outside the edges + + """ + #-------------------------------------------------------------------------- + # for development only (delete) + #ts = copy.copy(data["X"]) + #edge1 = 0 + #edge2 = 32 + #-------------------------------------------------------------------------- + + # correction1: if more than 30 cm out = nan + ts.loc[ts <= edge1+0.3] = np.nan + ts.loc[ts >= edge2+0.3] = np.nan + + # correction2: if less than 30 outside = edge + ts.loc[(ts > edge1-0.3) & (ts < edge1)] = edge1 + ts.loc[(ts < edge2+0.3) & (ts > edge2)] = edge2 + + + return ts + + +def filter_interp(t,x,y,win_med,gap_thres): + """ + Filters the x,y data by linear interpolation across median filtered data + Returns 'completed' time series between edges + If gap larger than threshold: no imputation + If gap larger than win_med/2: no median filter + + + Parameters + ---------- + t : TYPE = pandas Series + time series of time values expressed in relative seconds + x : TYPE = pandas Series + time series of x values that needs imputation + y : TYPE = pandas Series + time series of y values that needs imputation + win_med : TYPE = Int/float + Window for the median filter (kernel size) + gap_thres : TYPE = Int/float + gap threshold for when data imputation is not reliable anymore + + Returns + ------- + df3 : TYPE = Pandas DataFrame + DataFrame with imputed, filtered and completed data. From these, + barn area and time budget allocations can be calculated. + """ + + # construct dataframe + df = pd.DataFrame({'X' : x, + 'y' : y, + 't' : t}) + # calculate gaps from t + df["gap"] = t.diff() + df = df.reset_index(drop=1) + + # find gapsize < window/2 seconds -- only apply median filter to these + gapind = df["gap"].loc[df["gap"]>round(win_med/2)].index #startindex + gapind = pd.DataFrame(np.append(gapind,0)) + gapind = gapind.sort_values(by=0).reset_index(drop=1) + gapind2 = pd.DataFrame(np.append(df["gap"].loc[df["gap"]>round(win_med/2)].index,len(df)-1)) #endindex + gapind2 = gapind2.sort_values(by=0).reset_index(drop=1) + + # prepare calculations + df["xfilt"] = np.nan + df["yfilt"] = np.nan + + # calculate median depending on gapsize + for i in range(0,len(gapind)): + print(gapind.iloc[i,0],gapind2.iloc[i,0]) + if gapind2.iloc[i,0]-gapind.iloc[i,0] > win_med: + # median filter + a = medfilt(df["X"].loc[gapind.iloc[i,0]:gapind2.iloc[i,0]],kernel_size=win_med) + df.loc[gapind.iloc[i,0]:gapind2.iloc[i,0],"xfilt"] = a + a = medfilt(df["y"].loc[gapind.iloc[i,0]:gapind2.iloc[i,0]],kernel_size=win_med) + df.loc[gapind.iloc[i,0]:gapind2.iloc[i,0],"yfilt"] = a + else: + df.loc[gapind.iloc[i,0]:gapind2.iloc[i,0],"xfilt"] = \ + df["X"].loc[gapind.iloc[i,0]:gapind2.iloc[i,0]] + df.loc[gapind.iloc[i,0]:gapind2.iloc[i,0],"yfilt"] = \ + df["y"].loc[gapind.iloc[i,0]:gapind2.iloc[i,0]] + + # linear (x,y) interpolation based on xfilt and yfilt + tmin = df["t"].min() + tmax = df["t"].max() + tnew = np.linspace(tmin,tmax,tmax-tmin).round() + + # do the interpolation + f = scipy.interpolate.interp1d(df["t"],[df["xfilt"],df["yfilt"]],kind="linear") + xnew,ynew = f(tnew) + + # to dataframe + df2 = pd.DataFrame({'t' : tnew, + 'xnew' : xnew, + 'ynew' : ynew}) + + # merge with df based on key = t and tnew + df3 = pd.merge(df2,df, + how = 'outer', + on = 't').sort_values(by="t").reset_index(drop=1) + + # find gapsize > gap_thres seconds -- to delete data + gapind = df3["gap"].loc[df3["gap"]>gap_thres].index + gapind2 = df3["gap"].loc[df3["gap"]>gap_thres] + gaps = pd.DataFrame({'indx' : gapind, + 'gapsize' : gapind2}).reset_index(drop=1) + + # delete xnew and ynew if gap > gap_thresh + for i in range(0,len(gaps)): + print(i) + df3.loc[int(gaps["indx"][i]-gaps["gapsize"][i]+1): \ + int(gaps.indx[i])-1,["xnew","ynew"]] = np.nan + + return df3, df3["xnew"], df3["ynew"] \ No newline at end of file diff --git a/datapreparation/ines/read_data_sewio.py b/datapreparation/ines/read_data_sewio.py new file mode 100644 index 0000000..34b8a5a --- /dev/null +++ b/datapreparation/ines/read_data_sewio.py @@ -0,0 +1,537 @@ +# -*- coding: utf-8 -*- +""" +Created on Thu Feb 10 08:52:20 2022 +@author: adria036 +Project: NLAS-innovation +Contributors: Ines Adriaens, Bert Klandermans, Tomas Izquierdo +Contact: ines.adriaens@wur.nl + +Set of functions to read sewio data from data base +------------------------------------------------------------------------------- +This script contains the functions to read sewio data from parquet files. +You can specify which time frame to consider. + +Contains: + - start_end_dates + - read_parquet_accel + - convert_accel_data + - read_parquet_pos + - read_feeds + - read_all_data + +Packages you need to import: + import pandas as pd + import os + from datetime import date, timedelta, datetime + +------------------------------------------------------------------------------- +Usage example: + # import packages + from datetime import date,timedelta + from read_data_sewio import * + + # set parameters + check_date = date.today() - timedelta(days=1) + comparison_window = 1 # set to -1 to only load one day + path = r"W:/ASG/WLR_Dataopslag/DairyCampus/3406_Nlas/raw/dc_sewio/" + + # start and end date + startdate,enddate = start_end_dates(check_date,comparison_window) + + # read data all + data = read_all_data(startdate,enddate,path) + +---------------------------------------------------------------------------- +Data source: W:/ drive -- or DB copy + +""" + +#%% import packages +import pandas as pd +import os +from datetime import timedelta, datetime +import copy +import fastparquet +import openpyxl + +#%% define functions + +# function to define the window which data to read +def start_end_dates(check_date,comparison_window): + """ + Description + ----------- + Gives the end and start date with the window you need when comparing for + QC + + Parameters + ---------- + check_date : TYPE = datetime + for QC: date you want data to benchmark from. + comparison_window : TYPE = int + benchmark window. + + Returns + ------- + startdate : TYPE = datetime + startdate of data to consider. + enddate : TYPE = dateime + enddate of data to consider. + + """ + # start date + startdate = check_date - timedelta(days = comparison_window) + enddate = check_date + return startdate, enddate + +# function to read accelerometer data +def read_parquet_accel(startdate,enddate,path): + """ + Description + ----------- + Reads accelerometer data from the parquet files stores in W: drive + + Parameters + ---------- + startdate : TYPE = datetime + date of first file you want to read + enddate : TYPE = datetime + date of last file you want to read + path : TYPE = str + path where files are stored + + Returns + ------- + data : TYPE = pandas DataFrame + DataFrame with the read parquet files of acceleration data + col = ['history_id','date','at','acc_x','acc_y','acc_z'] + + """ + # check file path + if os.path.isdir(path): + # set filenames based on daterange (.parquet) + fn = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path,f)) \ + and ".parquet" in f \ + and datetime.strptime(f[0:10], '%Y-%m-%d').date() >= startdate \ + and datetime.strptime(f[0:10], '%Y-%m-%d').date() <= enddate] + # read data + data = pd.DataFrame([]) + for f in range(0,len(fn)): + print(fn[f]) + accdata = pd.read_parquet(os.path.join(path,fn[f]), engine='auto') + # cast to datetime to reduce computing memory + accdata['at'] = pd.to_datetime(accdata['at'] ) + accdata["at"] = accdata["at"].dt.tz_localize('UCT') + accdata["at"] = accdata["at"].dt.tz_convert('CET') + accdata['date'] = pd.to_datetime(accdata['date'] ) + accdata["date"] = accdata["date"].dt.tz_localize('UCT') + accdata["date"] = accdata["date"].dt.tz_convert('CET') + + # added + if not 'history_id' in accdata.columns: + accdata['history_id'] = accdata.index + + # cast to numeric + accdata['history_id'] = pd.to_numeric(accdata['history_id'],errors='coerce') + + #print(fn[f], accdata.memory_usage(deep=True)) + + # extract values (strings) to temporary dataframe + value = accdata.value + + # drop useless columns + accdata = accdata.drop(axis = 1, + columns = ['value']) + value = value.str.split(pat=";",expand=True) # split string + to int + value = value.rename(columns={0:"acc_x",1:"acc_y",2:"acc_z"}) # rename + + # set 'N/A' to 'NaN' for type convertion to float if str + try: + value.loc[value["acc_x"].str.contains('N/A'),"acc_x"] = 'NaN' + except: + pass + try: + value.loc[value["acc_y"].str.contains('N/A'),"acc_y"] = 'NaN' + except: + pass + try: + value.loc[value["acc_z"].str.contains('N/A'),"acc_z"] = 'NaN' + except: + pass + # drop rows with '%' values in x_acceleration + value = value.drop(value[(value["acc_x"].str.contains("%")==True)].index) + + # cast to numeric + value['acc_x'] = pd.to_numeric(value['acc_x'],errors='coerce') + value['acc_y'] = pd.to_numeric(value['acc_y'],errors='coerce') + value['acc_z'] = pd.to_numeric(value['acc_z'],errors='coerce') + accdata['acc_x'] = value['acc_x'] + accdata['acc_y'] = value['acc_y'] + accdata['acc_z'] = value['acc_z'] + del value + + data = pd.concat([data,accdata],axis=0, ignore_index=1) + + #print(fn[f], data.memory_usage(deep=True)) + return data + +# function to convert accelerometer data +def convert_accel_data(a_raw,dynamic_range = 2,to_ms = True): + """ + Description + ----------- + Converts the specified value to G-units, as defined by Noldus manual + + a[ms**2] = (a_raw * dynamic_range * g) / scale_factor + + with + g = gravitational_acceleration = 9.81 (to convert to ms**2) + a_raw= raw acceleration as extracted from sewio + dynamic range = [2,4,8,16] in current sewio sensors + scale_factor = 2**15 + + Parameters + ---------- + value : TYPE = Series of floats + Series of raw acceleration values. + dynamic_range : TYPE = int + ?? setting of sewio + to_ms : TYPE = Boolean + if True: convert to acceleration in meter per second*second + if False: convert to G-units + + Returns + ------- + acceleration : TYPE = Series of floats + Converted acceleration values. + + """ + # set scale factor (sewio setting) and g + scale_factor = 2**15 + g = 9.81 + + # set optional arguments, if not given = use these + # dynamic_range + # to_ms = True + + # calculate converted acceleration in g units + a_new = a_raw * dynamic_range / scale_factor + # convert to meter per squared second + if to_ms == True: + a_new = a_new * g + + return a_new + +# read position data +def read_parquet_pos(startdate,enddate,path): + """ + Description + ----------- + Read position data from sewio db files stored in W: as parquet + + Parameters + ---------- + startdate : TYPE = datetime + date of first file you want to read + enddate : TYPE = datetime + date of last file you want to read + path : TYPE = str + path where files are stored + + Returns + ------- + data : pandas DataFrame + pandas data frame containing x,y position. + + """ + # check file path + if os.path.isdir(path): + # set filenames based on daterange (.parquet) + fn = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path,f)) \ + and ".parquet" in f \ + and datetime.strptime(f[0:10], '%Y-%m-%d').date() >= startdate \ + and datetime.strptime(f[0:10], '%Y-%m-%d').date() <= enddate] + # read data + data = pd.DataFrame([]) + for f in range(0,len(fn)): + print(fn[f]) + sewdata = pd.read_parquet(os.path.join(path,fn[f])) + sewdata['at'] = pd.to_datetime(sewdata['at']) + sewdata["at"] = sewdata["at"].dt.tz_localize('UCT') + sewdata["at"] = sewdata["at"].dt.tz_convert('CET') + sewdata['date'] = pd.to_datetime(sewdata['date']) + sewdata["date"] = sewdata["date"].dt.tz_localize('UCT') + sewdata["date"] = sewdata["date"].dt.tz_convert('CET') + # added + if not 'position_id' in sewdata.columns: + sewdata['position_id'] = sewdata.index + + # drop useless columns + sewdata.drop(axis = 1, + columns = ['clr','numberOfAnchors','plan_reference'], inplace=True) + + data = pd.concat([data,sewdata],axis=0, ignore_index=1) + + return data + +# function to read "feeds" = tag id data +def read_feeds(startdate,enddate, path_os): + """ + Description + ----------- + Reads feed and stream data from sewio db files stored in W: as parquet + These are needed to check IDs + + Parameters + ---------- + startdate : TYPE = datetime + startdate of which feed data to read. + enddate : TYPE = datetime + enddate of which feed data to read. + + Returns + ------- + feeds : TYPE = pandas DataFrame + feeds = sensor ids to merge with data. + streams : TYPE = pandas DataFrame + streams = tags to merge + streamfeeds : TYPE = pandas DataFrame + streamfeeds = combined streams and feeds + """ + # prepare filepath construction + delta = enddate - startdate + # read file paths + for d in range(delta.days + 1): + print(d) + day_dataset = startdate + timedelta(days = d) + yy = day_dataset.year + mm = day_dataset.month + dd = day_dataset.day + if dd < 10 and mm < 10: + path = os.path.join(path_os,'db',str(yy), f'0{str(mm)}',f'0{str(dd)}') + elif dd > 9 and mm < 10: + path = os.path.join(path_os,'db',str(yy), f'0{str(mm)}',f'{str(dd)}') + elif dd < 9 and mm > 9: + path = os.path.join(path_os,'db',str(yy), f'{str(mm)}',f'0{str(dd)}') + else: + path = os.path.join(path_os,'db',str(yy), str(mm),str(dd)) + # read feeds and check unique + # fn_feeds = 'feeds.parquet' + feed_path = os.path.join(path, 'feeds.parquet') + if os.path.isfile(feed_path): + feeds_all = pd.read_parquet(feed_path) + feeds_all = feeds_all.loc[:,['id','alias','title']] + feeds_all = feeds_all.rename(columns={"id":"feed_reference"}) + if 'feeds' not in locals(): + print('feeds = ' + str(yy)+str(mm)+str(dd)) + feeds = feeds_all + else: + print('feeds = ' + str(yy)+str(mm)+str(dd)) + feeds = pd.concat([feeds,feeds_all], + axis=0, ignore_index=1, + join = 'outer', + keys = ['feed_reference','alias','title']) + feeds = feeds.drop_duplicates() + feeds = feeds.sort_values(by=['feed_reference']).reset_index(drop=1) + # fn_streams = 'datastreams.parquet' + stream_path = os.path.join(path, 'datastreams.parquet') + if os.path.isfile(stream_path): + streams_all = pd.read_parquet(stream_path) + streams_all = streams_all.loc[:,['datastream_id','feed_reference','d_id']] + if 'streams' not in locals(): + streams = streams_all + else: + print('streams = ' + str(yy)+str(mm)+str(dd)) + streams = pd.concat([streams,streams_all], + axis=0, ignore_index=1, + join = 'outer', + keys = ['datastream_id','feed_reference','d_id']) + streams = streams.drop_duplicates() + streams = streams.sort_values(by=['feed_reference']).reset_index(drop=1) + streams = streams.loc[streams["d_id"].str.contains('acc') == True] + # both together in the same table with an innerjoin + if 'feeds' not in locals() or 'streams' not in locals(): + streamfeeds = [] + else: + streamfeeds = streams.merge(feeds) + + return feeds,streams,streamfeeds + + +# combine all functions in a single one +def read_all_data(startdate,enddate,path): + """ + Description + ----------- + Uses the above functions to read and (outer) merge all data of position + and accelerometers into a single DataFrame + + Parameters + ---------- + startdate : TYPE = datetime + Startdate for data read + enddate : TYPE = datetime + Enddate for data read + path : TYPE = str + path where directories "db/", "position/" and "history/" are - these + contain the actual data files to read. + + Returns + ------- + data : TYPE = pandas DataFrame + Df with all the data, also the data that could not be merged between + position and accelerometer. + + """ + # read feeds in [startdate,enddate] + feeds,streams,streamfeeds = read_feeds(startdate, enddate, path) + + # read+convert accelerometer data from [startdate,enddate] + path_acc = os.path.join(path, "history") + accel = read_parquet_accel(startdate, enddate, path_acc) + accel["acc_x"] = convert_accel_data(accel["acc_x"]) + accel["acc_y"] = convert_accel_data(accel["acc_y"]) + accel["acc_z"] = convert_accel_data(accel["acc_z"]) + + # delete measurements with no acceleration; or when acc_x is available but no y,z + accel = accel.loc[accel["acc_y"].isna() == False,:] + + # read position data + path_pos = os.path.join(path, "position") + pos = read_parquet_pos(startdate, enddate, path_pos) + + # combine position with feeds + pos = pos.merge(feeds) + pos = pos.drop(axis = 1, + columns = ["position_id"]) + + # combine accel with streams + accel = accel.merge(streamfeeds, + left_on = 'datastream_reference', + right_on = 'datastream_id') + accel = accel.drop(axis = 1, + columns = ['history_id','datastream_reference', + 'datastream_id','d_id','alias']) + # combine accel with pos using date, at, feed_reference, alias, title + data = accel.merge(pos, + how ='outer', + on = ["date","at","feed_reference","title"]) + + return data + + +def read_ids(path, fn): + """ + Description + ----------- + Reads the ids from the animals, manually entered at DC when + batteries are charged, and couples them to the data. + + Parameters + ---------- + path : TYPE = str + path to xlsx file, read as pandas + + Returns + ------- + tags : TYPE = pandas DataFrame + Df with all the id data + + """ + # set path + # path = os.path.join( + # "C:","/Users","adria036","OneDrive - Wageningen University & Research", + # "iAdriaens_doc","Projects","cKamphuis","nlas","data","ancillary_data" + # ) + # fn = "\Sewio_tags.xlsx" + + # read tagIDs and tags2cow from excel file + tag2cow = pd.read_excel(path+fn, sheet_name = "Tag2Cow") + tagIDs = pd.read_excel(path+fn, sheet_name = "Tag_IDs") + # combine tag with tag id and inspect + tags = tag2cow.merge(tagIDs) + tags.head(20) + # return tags + return tags + +def combine_data_id(data,tags): + """ + Adds ids and barn number to data, based on "tags". Data can be either + position, accel or data (both) + + Parameters + ---------- + data : pandas DataFrame + Position, accel or data. + tags : pandas DataFrame + Read from tags excel sheet with function read_ids. + + Returns + ------- + data : pandas DataFrame + with cowid and barn number added for that session. + + """ + # in data: alias = "Sewio_tagnr" and "Sewio_tagID" corresponds to "title" + # remark: tested for alias = feed_reference but that seems incorrect + # principle: run over tags line per line; + # select the records with tag x between dates + # add cow id and barn number to selected data + # first select the records in "tags" of which enddate > min data["date"] + # correct for someone stupidly changing col name of the file + try: + tags = tags.rename(columns={"datetime_begin":"datetime_start"}) + except: + pass + + # correct for formating dates + #tags["datetime_start"] = tags.datetime_start.apply(lambda x: x.date()) + #tags["datetime_end"] = tags.datetime_end.apply(lambda x: x.date()) + tags["datetime_start"] = pd.to_datetime(tags["datetime_start"]) + tags["datetime_end"] = pd.to_datetime(tags["datetime_end"]) + data["date"] = data.date.apply(lambda x: x.date()) + + # selection step + tag_selected = tags.loc[(tags["datetime_start"] < max(pd.to_datetime(data["date"]))+timedelta(days=1)) & \ + ((tags["datetime_end"] >= min(pd.to_datetime(data["date"]))) | \ + (tags["datetime_end"].isna() == True)),:] + tag_selected = tag_selected.sort_values(by = ["cow","datetime_start"]) + # set endtime to today if NaT + tag_selected.loc[(tags["datetime_end"].isna()==True), + "datetime_end"] = pd.Timestamp.today() + # add records + newdata = copy.deepcopy(data) + newdata["barn"] = 0 + newdata["cowid"] = 0 + # add tag nr based on time and alias + for i in range(0,len(tag_selected)): + + # set data of current line + tagstart = tag_selected["datetime_start"].iloc[i] + tagstart = pd.DataFrame({'at': [tagstart]}) + tagstart = pd.to_datetime(tagstart['at'].astype(str)) + tagstart = tagstart.dt.tz_localize('CET') + + #tagstart = tagstart.to_pydatetime() + #tagstart = tagstart - timedelta(hours = 2) + tagend = tag_selected["datetime_end"].iloc[i] + tagend = pd.DataFrame({'at': [tagend]}) + tagend = pd.to_datetime(tagend['at'].astype(str)) + tagend = tagend.dt.tz_localize('CET') + #tagend = tagend - timedelta(hours = 2) + tagcow = tag_selected["cow"].iloc[i] + taggroup = tag_selected["group"].iloc[i] + tagnr = str(tag_selected["Sewio_tagnr"].iloc[i]) + # find all measurements in data after tagstart + # before tagend + # with feed_reference=tagnr + newdata.loc[(newdata["at"] >= tagstart[0]) \ + & (newdata["at"] <= tagend[0]) \ + & (newdata["alias"] == tagnr),"barn"] = taggroup + newdata.loc[(newdata["at"] >= tagstart[0]) \ + & (newdata["at"] <= tagend[0]) \ + & (newdata["alias"] == tagnr),"cowid"] = tagcow + newdata = newdata.sort_values(by = ["cowid","at"]).reset_index(drop=1) + # return newdata + return newdata + diff --git a/datapreparation/ines/requirements.txt b/datapreparation/ines/requirements.txt new file mode 100644 index 0000000..c8f27f0 --- /dev/null +++ b/datapreparation/ines/requirements.txt @@ -0,0 +1,8 @@ +# requirements for datapreparation steps +pandas +os +openpyxl +python-snappy +pyarrow +matplotlib +fastparquet \ No newline at end of file diff --git a/datapreparation/ines/test.txt b/datapreparation/ines/test.txt deleted file mode 100644 index 0045104..0000000 --- a/datapreparation/ines/test.txt +++ /dev/null @@ -1 +0,0 @@ -showcase branch \ No newline at end of file -- GitLab