Commit ff20d6e3 authored by Brummans, Nick's avatar Brummans, Nick
Browse files

Added thecode for feature extraction and train/val/test for a specific scenario

parent 14b88867
import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
class ExtractFeatures:
'''
@Given: the folder <data_path> where the csv files corresponding to diffferent pose estimations exist
@Then: Extract features for the machine learning models to be trained on
'''
def __init__(self, data_path): #data_path is the directory where the csv files exist
self.data_path = data_path
self.data_df_list = self.__load_data()
#Extracts the features from the csv files, compiles them into a dataset <X> with labels <Y>
def generate_features(self, draw_plots = False):
X = list()
Y = list()
count = 0
for df in self.data_df_list:
# print("COUNT: ", count)
# count = count + 1
# print(df)
raw_features_df = df[['left_wrist_y','left_wrist_x','right_wrist_y','right_wrist_x']]
labels_df = df[['quality']]
raw_features = raw_features_df.values #returns a numpy array
features = self.__get_sampled_features(raw_features)
if draw_plots:
raw_features_df.plot();
plt.show()
x = features
# print(x)
labels = labels_df.values
y = labels[0] #All labels for an activity are same [We dont discard between static and moving activity]
# print("label = ", y)
X.append(x)
Y.append(y)
X = np.array(X)
Y = np.array(Y)
return X, Y
#Creates a combined data frame from all the invidual csv files, with wrist locations and object classes and labels
def __load_data(self):
files_dict = self.__get_files_dict()
# print(files_dict)
return self.__create_data_df_list(files_dict)
#Reads the floder and creates a key for each category and key is associated with all
# "*with_headers.csv" files for that specific category
def __get_files_dict(self):
activity_classes = os.listdir(self.data_path)
files_dict = {}
for activity in activity_classes:
activity_dir = os.path.join(self.data_path,activity)
files_dict[activity] = glob.glob(str(activity_dir)+"/*.mp4.csv") #MAGIC VARIABLE
return files_dict
#Creates a list of pandas dataframes from the cs files with the following columns:
#'frame_index','left_wrist_y','left_wrist_x','right_wrist_y','right_wrist_x','quality'
#here , apart from frame index is the timestampof an activity, wrist positions are features and quality is the label
def __create_data_df_list(self, files_dict):
data_df_list = list()
for key in files_dict.keys():
for csv_f in files_dict[key]:
df = pd.read_csv(csv_f)
# data_df = df[['frame_index','left_wrist_y','left_wrist_x','right_wrist_y','right_wrist_x','quality']]
# data_df = data_df.set_index('frame_index')
' ANEESH: Since frame-inices are wrong, usinf detection index '
data_df = df[['detection_index','left_wrist_y','left_wrist_x','right_wrist_y','right_wrist_x','quality']]
data_df = data_df.set_index('detection_index')
data_df_list.append(data_df)
return data_df_list
def __moving_average(self, x, w):
# print(x.shape)
return np.convolve(np.array(x), np.ones(w), mode='valid') / w
#This function uniformly divides the raw features into <num_smaples> regularly spaced time stamps
# and flatten to create a feature vector
def __get_sampled_features(self, raw_features, num_samples = 40):
idx = np.round(np.linspace(0, len(raw_features) - 1, num_samples)).astype(int)
sampled_features = raw_features[idx]
# print("O: ")
# print(sampled_features.shape)
sampled_features_flattened = sampled_features.flatten()
# print("A: ")
# print(sampled_features_flattened.shape)
sampled_features_flattened = self.__moving_average(sampled_features_flattened, 10)
# print("B: ")
# print(sampled_features_flattened.shape)
return sampled_features_flattened
import numpy as np
from sklearn import preprocessing
import joblib
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
class RandomForestModel:
def __init__(self):
self.model = None #This value in not None once the training is done
def train(self, X_train, y_train):
cv = StratifiedShuffleSplit(n_splits=5, test_size=0.15, random_state=42)
# Instantiate the grid search model
param_grid = self.__create_parameter_grid()
pipe = Pipeline([
('rf', RandomForestClassifier())
])
rf_grid = GridSearchCV(pipe, param_grid = param_grid,
cv = cv, n_jobs = -1)#verbose = 2)
rf_grid.fit(X_train, y_train)
print("The best parameters are %s with a score of %0.2f"
% (rf_grid.best_params_, rf_grid.best_score_))
self.model = rf_grid.best_estimator_
def predict(self, x_in):
return self.model.predict(x_in)
def __create_parameter_grid(self):
max_depth_range = np.arange(2,10)
#NOTE: naming of parameters should have "rf__" in the beigning so that pipeline "rf" could be used with these paramenter
param_grid = {
# 'bootstrap': [True],
'rf__max_depth': max_depth_range,
'rf__min_samples_leaf': [2],
'rf__n_estimators': np.arange(2,20),
'rf__random_state': [42]
}
return param_grid
class Model:
def __init__(self, X, Labels):
self.X = X
self.le = self.__label_encoder(Labels)
self.Y = self.__encode_labels(Labels)
self.__create_train_test_datasets()
self.X_scaler_model = self.__data_normalizer()
self.model = RandomForestModel()
# print(self.Y)
#Training routine
def train(self):
X_train_scaled = self.X_scaler_model.transform(self.X_train)
self.model.train(X_train_scaled, self.y_train)
def predict(self, x):
x_scaled = self.X_scaler_model.transform(x)
return self.model.predict(x_scaled)
def get_classification_report(self):
self.__get_test_report()
self.__get_train_report()
self.__get_complete_data_report()
def __get_test_report(self):
y_pred = self.predict(self.X_test)
unique_textual_labels = self.decode_labels(np.unique(y_pred))
num_classes = len(unique_textual_labels)
cr = classification_report(self.y_test, y_pred, target_names=unique_textual_labels)
cm = confusion_matrix(self.y_test, y_pred, labels=range(num_classes))
print('\n-------------------Test results------------------------------\n')
print(cr)
print(cm)
print('\n-------------------------------------------------\n')
def __get_train_report(self):
y_pred = self.predict(self.X_train)
unique_textual_labels = self.decode_labels(np.unique(y_pred))
num_classes = len(unique_textual_labels)
cr = classification_report(self.y_train, y_pred, target_names=unique_textual_labels)
cm = confusion_matrix(self.y_train, y_pred, labels=range(num_classes))
print('\n-------------------Train results------------------------------\n')
print(cr)
print(cm)
print('\n-------------------------------------------------\n')
def __get_complete_data_report(self):
y_pred = self.predict(self.X)
unique_textual_labels = self.decode_labels(np.unique(y_pred))
num_classes = len(unique_textual_labels)
cr = classification_report(self.Y, y_pred, target_names=unique_textual_labels)
cm = confusion_matrix(self.Y, y_pred, labels=range(num_classes))
print('\n-------------------Full data results------------------------------\n')
print(cr)
print(cm)
print('\n-------------------------------------------------\n')
# X_scaled = self.X_scaler_model.transform(self.X)
# X_train = X_train_scaled
# X_test = X_test_scaled
# model = RandomForestModel()
# X = X_scaled
#Convert numerical labels to textual labels
def decode_labels(self, numerical_labels):
return self.le.inverse_transform(numerical_labels)
#Create stratified train/test split
def __create_train_test_datasets(self, test_size=0.20):
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
self.X, self.Y, test_size=test_size,
shuffle=True, random_state=1111, stratify=self.Y)
#Create normalizer model based on training data
def __data_normalizer(self):
#Normalize the data
scaler = MinMaxScaler()
return scaler.fit(self.X_train)
#Convert Labels to numerical labels
def __label_encoder(self, text_labels):
le = preprocessing.LabelEncoder()
le.fit(np.unique(text_labels))
return le
#Convert Labels to numerical labels
def __encode_labels(self, text_labels):
return self.le.transform(text_labels.ravel())
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment