Skip to content
Snippets Groups Projects
Commit 57f7423e authored by Brummans, Nick's avatar Brummans, Nick
Browse files

update on pipeline scripts and added script for single task runs

parent 9918bf27
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,7 @@ pipe = PipelineController(
name='pipeline test',
project='pipeline',
version='0.0.1',
add_pipeline_tags=True,
add_pipeline_tags=True
)
pipe.add_parameter(
......@@ -17,8 +17,8 @@ pipe.add_parameter(
'data/mnist_png/testing'
)
pipe.set_default_execution_queue("docker_image")
# pipe.set_default_execution_queue('test')
# set default queue
pipe.set_default_execution_queue("queue_name")
# Adding the first stage to the pipeline, a clone of the base tasks will be created and used
pipe.add_step(name='stage_data',
......@@ -41,6 +41,6 @@ pipe.add_step(name='stage_train',
# Starting the pipeline
pipe.start_locally(run_pipeline_steps_locally=True)
# comment line above and uncomment line below to run scrips on queue test on which agents are listening
#pipe.start(queue='docker_image')
#pipe.start(queue='queue_name')
print('done')
File moved
......@@ -86,7 +86,7 @@ args = {
'dataset_name_training': "",
'dataset_name_test': "",
'dataset_project': "",
'epochs': 10,
'epochs': 3,
'train_batch_size': 256,
'validation_batch_size': 256,
'train_num_workers': 0,
......
""" the task.remote_execution option is used when it's needed to run part of the code locally and then move it for
full execution remotely. When running locally, the task.remote_execution() will complete the currently running task and
enqueue it to a chosen queue. When running in an agent, it will ignore the task.remote_execution() and proceed to execute
the code. This feature is especially helpful if you want to run the first epoch locally on your machine to debug and to
make sure code doesn't crash, and then move to a stronger machine for the entire training.
"""
from clearml import Task, Dataset
from clearml import Logger
from clearml import OutputModel
from clearml import StorageManager
import pickle
import argparse
import time
from tqdm.auto import tqdm
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torch.optim as optim
from model import CNNModel
from utils import save_model
# training
def train(model, trainloader, optimizer, criterion):
model.train()
print('Training')
train_running_loss = 0.0
train_running_correct = 0
counter = 0
for i, data in tqdm(enumerate(trainloader), total=len(trainloader)):
counter += 1
image, labels = data
image = image.to(device)
labels = labels.to(device)
optimizer.zero_grad()
# forward pass
outputs = model(image)
# calculate the loss
loss = criterion(outputs, labels)
train_running_loss += loss.item()
# calculate the accuracy
_, preds = torch.max(outputs.data, 1)
train_running_correct += (preds == labels).sum().item()
# backpropagation
loss.backward()
# update the optimizer parameters
optimizer.step()
# loss and accuracy for the complete epoch
epoch_loss = train_running_loss / counter
epoch_acc = 100. * (train_running_correct / len(trainloader.dataset))
return epoch_loss, epoch_acc
# validation
def validate(model, testloader, criterion):
model.eval()
print('Validation')
valid_running_loss = 0.0
valid_running_correct = 0
counter = 0
with torch.no_grad():
for i, data in tqdm(enumerate(testloader), total=len(testloader)):
counter += 1
image, labels = data
image = image.to(device)
labels = labels.to(device)
# forward pass
outputs = model(image)
# calculate the loss
loss = criterion(outputs, labels)
valid_running_loss += loss.item()
# calculate the accuracy
_, preds = torch.max(outputs.data, 1)
valid_running_correct += (preds == labels).sum().item()
# loss and accuracy for the complete epoch
epoch_loss = valid_running_loss / counter
epoch_acc = 100. * (valid_running_correct / len(testloader.dataset))
return epoch_loss, epoch_acc
if __name__ == '__main__':
task = Task.init(project_name="task example",
task_name="single script run",
output_uri=True)
args = {
'training_path': 'data/mnist_png/training',
'test_path': 'data/mnist_png/testing',
'dataset_project': "pipeline",
'dataset_name_training': "training_dataset",
'dataset_name_test': "testing_dataset",
'remote_queue': 'k8s_scheduler',
'epochs': 3,
'train_batch_size': 256,
'validation_batch_size': 256,
'train_num_workers': 0,
'validation_num_workers': 0,
'resize': 28,
'lr': 1e-3
}
task.connect(args)
dataset_train = Dataset.create(
dataset_name=args['dataset_name_training'],
dataset_project=args['dataset_project']
)
dataset_test = Dataset.create(
dataset_name=args['dataset_name_test'],
dataset_project=args['dataset_project']
)
dataset_train.add_files(path=args['training_path'])
dataset_test.add_files(path=args['test_path'])
dataset_train.upload()
dataset_test.upload()
dataset_train.finalize()
dataset_test.finalize()
print('Done uploading datasets')
# get logger
logger = Logger.current_logger()
# the training transforms
train_transform = transforms.Compose([
transforms.Resize(args['resize']),
#transforms.RandomHorizontalFlip(p=0.5),
#transforms.RandomVerticalFlip(p=0.5),
#transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
#transforms.RandomRotation(degrees=(30, 70)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.5, 0.5, 0.5],
std=[0.5, 0.5, 0.5]
)
])
# the validation transforms
valid_transform = transforms.Compose([
transforms.Resize(args['resize']),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.5, 0.5, 0.5],
std=[0.5, 0.5, 0.5]
)
])
# training dataset
train_dataset = datasets.ImageFolder(
root=args['training_path'],
transform=train_transform
)
# validation dataset
valid_dataset = datasets.ImageFolder(
root=args['test_path'],
transform=valid_transform
)
# training data loaders
train_loader = DataLoader(
train_dataset, batch_size=args['train_batch_size'], shuffle=True,
num_workers=args['train_num_workers'], pin_memory=True
)
# validation data loaders
valid_loader = DataLoader(
valid_dataset, batch_size=args['validation_batch_size'], shuffle=False,
num_workers=args['validation_num_workers'], pin_memory=True
)
device = ('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Computation device: {device}\n")
model = CNNModel().to(device)
print(model)
# total parameters and trainable parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"{total_params:,} total parameters.")
total_trainable_params = sum(
p.numel() for p in model.parameters() if p.requires_grad)
print(f"{total_trainable_params:,} training parameters.")
# optimizer
optimizer = optim.Adam(model.parameters(), lr=args['lr'])
# loss function
criterion = nn.CrossEntropyLoss()
# lists to keep track of losses and accuracies
train_loss, valid_loss = [], []
train_acc, valid_acc = [], []
# start the training
for epoch in range(args['epochs']):
if epoch > 0 and args['remote_queue']:
# We run training for 1 epoch to make sure nothing crashes then local execution will be terminated.
# Execution will switch to remote execution by the agent listening to specified queue
task.execute_remotely(queue_name=args['remote_queue'])
print(f"[INFO]: Epoch {epoch+1} of {args['epochs']}")
train_epoch_loss, train_epoch_acc = train(model, train_loader,
optimizer, criterion)
valid_epoch_loss, valid_epoch_acc = validate(model, valid_loader,
criterion)
train_loss.append(train_epoch_loss)
valid_loss.append(valid_epoch_loss)
train_acc.append(train_epoch_acc)
valid_acc.append(valid_epoch_acc)
print(f"Training loss: {train_epoch_loss:.3f}, training acc: {train_epoch_acc:.3f}")
logger.report_scalar(
"loss", "train", iteration=epoch, value=train_epoch_loss
)
logger.report_scalar(
"accuracy", "train", iteration=epoch, value=train_epoch_acc
)
print(f"Validation loss: {valid_epoch_loss:.3f}, validation acc: {valid_epoch_acc:.3f}")
logger.report_scalar(
"loss", "validation", iteration=epoch, value=valid_epoch_loss
)
logger.report_scalar(
"accuracy", "validation", iteration=epoch, value=valid_epoch_acc
)
print('-'*50)
time.sleep(5)
# store in a way we can easily load into triton without having to have the model class
torch.jit.script(model).save('serving_model.pt')
OutputModel().update_weights('serving_model.pt')
print('TRAINING COMPLETE')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment