Skip to content
Snippets Groups Projects
Commit 9599af8d authored by Brummans, Nick's avatar Brummans, Nick
Browse files

update

parent ae10964a
No related branches found
No related tags found
No related merge requests found
......@@ -14,7 +14,6 @@ def step_one(
):
print('step_one')
from clearml import Dataset
import google.appengine.api
dataset_train = Dataset.create(
dataset_name=dataset_name_training, dataset_project=dataset_project
......@@ -46,7 +45,6 @@ def step_two(
lr: float = 1e-3
):
print('step_two')
import google.appengine.api
import pickle
import torchvision.transforms as transforms
import torchvision.datasets as datasets
......@@ -220,11 +218,11 @@ if __name__ == '__main__':
# we need to specific the parent steps: step_one
pipe.add_function_step(
name='step_two',
parents=['step_one'],
parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_two,
function_kwargs=dict(dataset_name_training='${step_one.dataset_name_training}',
dataset_name_test='${step_one.dataset_name_test}',
dataset_project='${step_one.dataset_project}',
function_kwargs=dict(dataset_name_training='${step_one.artifacts.dataset_name_training}',
dataset_name_test='${step_one.artifacts.dataset_name_test}',
dataset_project='${step_one.artifacts.dataset_project}',
epochs=10,
train_batch_size=256,
validation_batch_size=256,
......
from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes
@PipelineDecorator.component(return_values=['dataset_name_training, dataset_name_test, dataset_project'], cache=True, repo="https://git.wur.nl/mdt-research-it-solutions/clearml-demo.git", task_type=TaskTypes.data_processing, packages="./requirements.txt")
def step_one(training_path: str = 'data/mnist_png/training', test_path: str = 'data/mnist_png/testing', dataset_project: str = "pipeline", dataset_name_training: str = "training_dataset", dataset_name_test: str = "testing_dataset"):
PipelineDecorator.set_default_execution_queue('test')
print('step_one')
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
from clearml import Dataset
dataset_train = Dataset.create(
dataset_name=dataset_name_training, dataset_project=dataset_project
)
dataset_test = Dataset.create(
dataset_name=dataset_name_test, dataset_project=dataset_project
)
dataset_train.add_files(path=training_path)
dataset_test.add_files(path=test_path)
dataset_train.upload()
dataset_test.upload()
dataset_train.finalize()
dataset_test.finalize()
return dataset_name_training, dataset_name_test, dataset_project
@PipelineDecorator.component(return_values=['model'], cache=True, repo="https://git.wur.nl/mdt-research-it-solutions/clearml-demo.git", task_type=TaskTypes.training, packages="./requirements.txt")
def step_two(dataset_name_training, dataset_name_test, dataset_project, epochs: int = 10, train_batch_size: int = 256, validation_batch_size: int = 256, train_num_workers: int = 0, validation_num_workers: int = 0, resize: int = 28, lr: float = 1e-3):
PipelineDecorator.set_default_execution_queue('test')
print('step_two')
# make sure we have pandas for this step, we need it to use the data_frame
import pickle
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torch.optim as optim
import time
from tqdm.auto import tqdm
from model import CNNModel
from model_utils import train, validate
from clearml import Logger
from clearml import StorageManager
from clearml import Dataset
mnist_train = Dataset.get(
dataset_name=dataset_name_training, dataset_project=dataset_project
).get_local_copy()
mnist_test = Dataset.get(
dataset_name=dataset_name_test, dataset_project=dataset_project
).get_local_copy()
# get logger
logger = Logger.current_logger()
# the training transforms
train_transform = transforms.Compose([
transforms.Resize(resize),
#transforms.RandomHorizontalFlip(p=0.5),
#transforms.RandomVerticalFlip(p=0.5),
#transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
#transforms.RandomRotation(degrees=(30, 70)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.5, 0.5, 0.5],
std=[0.5, 0.5, 0.5]
)
])
# the validation transforms
valid_transform = transforms.Compose([
transforms.Resize(resize),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.5, 0.5, 0.5],
std=[0.5, 0.5, 0.5]
)
])
# training dataset
train_dataset = datasets.ImageFolder(
root=mnist_train,
transform=train_transform
)
# validation dataset
valid_dataset = datasets.ImageFolder(
root=mnist_test,
transform=valid_transform
)
# training data loaders
train_loader = DataLoader(
train_dataset, batch_size=train_batch_size, shuffle=True,
num_workers=train_num_workers, pin_memory=True
)
# validation data loaders
valid_loader = DataLoader(
valid_dataset, batch_size=validation_batch_size, shuffle=False,
num_workers=validation_num_workers, pin_memory=True
)
device = ('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Computation device: {device}\n")
model = CNNModel().to(device)
print(model)
# total parameters and trainable parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"{total_params:,} total parameters.")
total_trainable_params = sum(
p.numel() for p in model.parameters() if p.requires_grad)
print(f"{total_trainable_params:,} training parameters.")
# optimizer
optimizer = optim.Adam(model.parameters(), lr=lr)
# loss function
criterion = nn.CrossEntropyLoss()
# lists to keep track of losses and accuracies
train_loss, valid_loss = [], []
train_acc, valid_acc = [], []
# start the training
for epoch in range(epochs):
print(f"[INFO]: Epoch {epoch+1} of {epochs}")
train_epoch_loss, train_epoch_acc = train(model, train_loader,
optimizer, criterion, device)
valid_epoch_loss, valid_epoch_acc = validate(model, valid_loader,
criterion, device)
train_loss.append(train_epoch_loss)
valid_loss.append(valid_epoch_loss)
train_acc.append(train_epoch_acc)
valid_acc.append(valid_epoch_acc)
print(f"Training loss: {train_epoch_loss:.3f}, training acc: {train_epoch_acc:.3f}")
logger.report_scalar(
"loss", "train", iteration=epoch, value=train_epoch_loss
)
logger.report_scalar(
"accuracy", "train", iteration=epoch, value=train_epoch_acc
)
print(f"Validation loss: {valid_epoch_loss:.3f}, validation acc: {valid_epoch_acc:.3f}")
logger.report_scalar(
"loss", "validation", iteration=epoch, value=valid_epoch_loss
)
logger.report_scalar(
"accuracy", "validation", iteration=epoch, value=valid_epoch_acc
)
return model
# The actual pipeline execution context
# notice that all pipeline component function calls are actually executed remotely
# Only when a return value is used, the pipeline logic will wait for the component execution to complete
@PipelineDecorator.pipeline(name='pipeline test', project='pipeline_deco', version='0.0.5', pipeline_execution_queue="test")
def executing_pipeline(training_path='data/mnist_png/training', test_path='data/mnist_png/testing'):
from utils import save_model
from clearml import InputModel
# Use the pipeline argument to start the pipeline and pass it ot the first step
print('launch step one')
dataset_name_training, dataset_name_test, dataset_project = step_one(training_path, test_path, "pipeline", "training_dataset", "testing_dataset")
# Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`)
# Notice! unless we actually access the `data_frame` object,
# the pipeline logic does not actually load the artifact itself.
# When actually passing the `data_frame` object into a new step,
# It waits for the creating step/function (`step_one`) to complete the execution
print('launch step two')
model = step_two(dataset_name_training, dataset_name_test, dataset_project, 10, 256, 256, 0, 0, 28, 1e-3)
# store in a way we can easily load into triton without having to have the model class
torch.jit.script(model).save('serving_model.pt')
OutputModel().update_weights('serving_model.pt')
if __name__ == '__main__':
# set the pipeline steps default execution queue (per specific step we can override it with the decorator)
PipelineDecorator.set_default_execution_queue('test')
# Run the pipeline steps as subprocesses on the current machine, great for local executions
# (for easy development / debugging, use `PipelineDecorator.debug_pipeline()` to execute steps as regular functions)
# Start the pipeline execution logic.
executing_pipeline(
training_path='data/mnist_png/training', test_path='data/mnist_png/testing'
)
print('process completed')
......@@ -5,4 +5,3 @@ torch
torchaudio
tqdm
Pillow
appengine-python-standard
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment