From ad66ebe047661b3f00e8ee6b45e0ef9666393351 Mon Sep 17 00:00:00 2001 From: "Brummans, Nick" <nick.brummans@wur.nl> Date: Thu, 17 Mar 2022 14:39:00 +0000 Subject: [PATCH] Upload New File --- pipelines.ipynb | 674 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 674 insertions(+) create mode 100644 pipelines.ipynb diff --git a/pipelines.ipynb b/pipelines.ipynb new file mode 100644 index 0000000..8bdca33 --- /dev/null +++ b/pipelines.ipynb @@ -0,0 +1,674 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "source": [ + "# Setup\n", + "\n", + "It is important to maintain a conda dependency file and/or MLstudio environment. \n", + "Every user of the workspace will use their own compute instance, with conda files and environments it is easy to install dependencies on these different compute instances.\n", + "For each conda environment we can setup a kernel so the notebook will use this environment.\n", + "\n", + "- Open terminal (terminal opens in your account folder)\n", + " - conda env update --file workshop-mlstudio/conda-notebook.yml\n", + " - conda activate workshop_env\n", + " - python -m ipykernel install --user --name=workshop_env --display-name=workshop_env\n", + "\n", + "Refresh page and change kernel to workshop_env." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "markdown", + "source": [ + "Connect to the workspace for easier Azure commands." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.core import Workspace\n", + "\n", + "ws = Workspace.from_config()\n", + "print(f'WS name: {ws.name}\\nRegion: {ws.location}\\nSubscription id: {ws.subscription_id}\\nResource group: {ws.resource_group}')" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646322566789 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "## Setup environment\r\n", + "\r\n", + "For setting up pipelines, we again need an environment.\r\n", + "Because we are going to create a training pipeline, we can copy the environment from train_model.ipynb." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.core import Environment\n", + "\n", + "new_update_env = False\n", + "env_name='workshop-training-env'\n", + "# pathing in notebook folder\n", + "conda_path='conda-training.yml'\n", + "\n", + "if new_update_env:\n", + " # create new environment\n", + " env = Environment.from_conda_specification(name=env_name, file_path=conda_path)\n", + " env.register(workspace=ws)\n", + " # We can directly build the environment - this will create a new Docker \n", + " # image in Azure Container Registry (ACR), and directly 'bake in' our dependencies \n", + " # from the conda definition. When we later use the Environment, all AML will need to \n", + " # do is pull the image for environment, thus saving the time for potentially a \n", + " # long-running conda environment creation.\n", + " build = env.build(workspace=ws)\n", + " build.wait_for_completion(show_output=True)\n", + "else:\n", + " # load existing environment\n", + " env = Environment.get(workspace=ws, name=env_name)" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646322717527 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "## Create experiment\r\n", + "\r\n", + "Create an experiment to track the runs in your notebook. A workspace can have muliple experiments. We will create an experiment to track our pipeline deployments and submissions." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.core import Experiment\n", + "\n", + "experiment_name = 'train_pipeline_name'\n", + "exp = Experiment(workspace=ws, name=experiment_name)" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646322763342 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "### Attach existing compute resource\r\n", + "\r\n", + "By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU or CPU support.\r\n", + "\r\n", + "We will use our cluster. It is better to keep the training compute (which probably has better specs) seperate from the notebook compute. This ensure a lower cost (only use heavy compute in the place where it is needed) and a central compute instance for every user of the workspace." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.core.compute import AmlCompute\n", + "from azureml.core.compute import ComputeTarget\n", + "import os\n", + "\n", + "# choose compute target. Look at compute tab -> clusters for options OR look at list in cell above.\n", + "compute_name = \"cpu-cluster\"\n", + "\n", + "if compute_name in ws.compute_targets:\n", + " compute_target = ws.compute_targets[compute_name]\n", + " print(\"found compute target: \" + compute_name)\n", + "else:\n", + " print(\"Compute not found, create compute in compute tab (cluster) with subnet in advanced settings if working in production subscription.\")" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646322818343 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "## Import Data\r\n", + "\r\n", + "We can use the same data that was used during training of the model." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.core import Dataset\n", + "\n", + "# get dataset by name\n", + "image_dataset = Dataset.get_by_name(ws, \"images_name\")\n", + "labels_dataset = Dataset.get_by_name(ws, \"labels_name\")" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646323023669 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "### Pipelines\r\n", + "\r\n", + "In Azure Machine Learning, a pipeline is a workflow of machine learning tasks in which each task is implemented as a step.\r\n", + "\r\n", + "Steps can be arranged sequentially or in parallel, enabling you to build sophisticated flow logic to orchestrate machine learning operations. Each step can be run on a specific compute target, making it possible to combine different types of processing as required to achieve an overall goal.\r\n", + "\r\n", + "A pipeline can be executed as a process by running the pipeline as an experiment. Each step in the pipeline runs on its allocated compute target as part of the overall experiment run.\r\n", + "\r\n", + "You can publish a pipeline as a REST endpoint, enabling client applications to initiate a pipeline run. You can also define a schedule for a pipeline, and have it run automatically at periodic intervals." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "markdown", + "source": [ + "### Steps\r\n", + "\r\n", + "An Azure Machine Learning pipeline consists of one or more steps that perform tasks. There are many kinds of steps supported by Azure Machine Learning pipelines, each with its own specialized purpose and configuration options.\r\n", + "\r\n", + "Common kinds of step in an Azure Machine Learning pipeline include:\r\n", + "\r\n", + "- **PythonScriptStep:** Runs a specified Python script.\r\n", + "- **DataTransferStep:** Uses Azure Data Factory to copy data between data stores.\r\n", + "- **DatabricksStep:** Runs a notebook, script, or compiled JAR on a databricks cluster.\r\n", + "- **AdlaStep:** Runs a U-SQL job in Azure Data Lake Analytics.\r\n", + "- **ParallelRunStep:** Runs a Python script as a distributed task on multiple compute nodes.\r\n", + "\r\n", + "Note: For a full list of supported step types, see azure.pipeline.steps package documentation [https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps?view=azure-ml-py](link-URL)\r\n", + "\r\n", + "To create a pipeline, you must first define each step and then create a pipeline that includes the steps. The specific configuration of each step depends on the step type. For example the following code defines a PythonScriptStep that runs our training code, referencing our code, passing in the args, and using the environment definition:" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.pipeline.steps import PythonScriptStep\n", + "from azureml.core import RunConfiguration\n", + "\n", + "# The RunConfiguration object encapsulates the information necessary to submit a training run in an experiment. In this case our environment\n", + "runconfig = RunConfiguration()\n", + "runconfig.environment = env\n", + "\n", + "# first define our training arguments\n", + "args = ['--image-folder', image_dataset.as_mount(), # it is also possible to download image dataset on compute (as_download(), because mounting load files at the time of processing, it is usually faster than download.)\n", + " '--labels', labels_dataset.as_named_input('labels_name'),\n", + " '--size', 512, \n", + " '--split', 0.2,\n", + " '--batch-size', 4,\n", + " '--epochs', 1,\n", + " '--num-workers', 0,\n", + " '--num-classes', 4,\n", + " '--learning-rate', 5e-5]\n", + "\n", + "# Then define our training step\n", + "train_step = PythonScriptStep(name=\"train-step\",\n", + " source_directory=\"./\",\n", + " script_name=\"scripts/train.py\",\n", + " arguments=args,\n", + " runconfig=runconfig,\n", + " compute_target=compute_name)\n", + "\n", + "steps = [train_step]" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646327622882 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "Finally, we can create our pipeline object and validate it. This will check the input and outputs are properly linked and that the pipeline graph is a non-cyclic graph:" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.pipeline.core import Pipeline\n", + "\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\n", + "pipeline.validate()" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646327640539 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "Lastly, we can submit the pipeline against our experiment:" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "pipeline_run = exp.submit(pipeline)\n", + "pipeline_run.wait_for_completion()" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646328479387 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "Alternatively, we can also publish the pipeline as a RESTful API Endpoint:" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "published_pipeline = pipeline.publish('workshop-pipeline-name')\n", + "published_pipeline" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646328479520 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "What if we want to continously publish a new pipelines, but have it published as the same URL as the version prior? For this, we can use PipelineEndpoint, which keeps multiple PublishedPipelines behind a single endpoint URL. It allows to set default_version, which determines to which PublishedPipeline it should route the request." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.pipeline.core import PipelineEndpoint\n", + "\n", + "endpoint_name = \"workshop-pipeline-endpoint-name\"\n", + "\n", + "try:\n", + " pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name=endpoint_name)\n", + " # Add new default endpoint - only works from PublishedPipeline\n", + " pipeline_endpoint.add_default(published_pipeline)\n", + "except Exception:\n", + " pipeline_endpoint = PipelineEndpoint.publish(workspace=ws,\n", + " name=endpoint_name,\n", + " pipeline=pipeline,\n", + " description=\"New Training Pipeline Endpoint\")\n" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "gather": { + "logged": 1646328480556 + } + } + }, + { + "cell_type": "markdown", + "source": [ + "## Pass data between pipeline steps\r\n", + "\r\n", + "Often, a pipeline line includes at least one step that depends on the output of a preceding step. For example, you might use a step that runs a python script to preprocess some data, which must then be used in a subsequent step to train a model.\r\n", + "\r\n", + "The **OutputFileDatasetConfig** object is a special kind of dataset that:\r\n", + "\r\n", + "- References a location in a datastore for interim storage of data.\r\n", + "- Creates a data dependency between pipeline steps.\r\n", + "\r\n", + "You can view a OutputFileDatasetConfig object as an intermediary store for data that must be passed from a step to a subsequent step.\r\n", + "\r\n", + "To use a OutputFileDatasetConfig object to pass data between steps, you must:\r\n", + "\r\n", + "1. Define a named OutputFileDatasetConfig object that references a location in a datastore. If no explicit datastore is specified, the default datastore is used.\r\n", + "2. Pass the OutputFileDatasetConfig object as a script argument in steps that run scripts.\r\n", + "3. Include code in those scripts to write to the OutputFileDatasetConfig argument as an output or read it as an input.\r\n", + "\r\n", + "For example, the following code defines a OutputFileDatasetConfig object that for the preprocessed data that must be passed between the steps.\r\n", + "\r\n", + "**_!! NOTE: **the following code is an example of a multistep pipeline. This is considered out of scope for this workshop and the code is non functional. It is only shown for example purpose.** !!_**" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "from azureml.data import OutputFileDatasetConfig\n", + "from azureml.pipeline.steps import PythonScriptStep, EstimatorStep\n", + "\n", + "# Get a dataset for the initial data\n", + "raw_ds = Dataset.get_by_name(ws, 'raw_dataset')\n", + "\n", + "# Define a PipelineData object to pass data between steps\n", + "data_store = ws.get_default_datastore()\n", + "prepped_data = OutputFileDatasetConfig('prepped')\n", + "\n", + "# Step to run a Python script\n", + "step1 = PythonScriptStep(name = 'prepare data',\n", + " source_directory = 'scripts',\n", + " script_name = 'data_prep.py',\n", + " compute_target = 'aml-cluster',\n", + " # Script arguments include PipelineData\n", + " arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'),\n", + " '--out_folder', prepped_data])\n", + "\n", + "# Step to run an estimator\n", + "step2 = PythonScriptStep(name = 'train model',\n", + " source_directory = 'scripts',\n", + " script_name = 'train_model.py',\n", + " compute_target = 'aml-cluster',\n", + " # Pass as script argument\n", + " arguments=['--training-data', prepped_data.as_input()])" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "markdown", + "source": [ + "In the scripts themselves, you can obtain a reference to the OutputFileDatasetConfig object from the script argument, and use it like a local folder." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "# code in data_prep.py\n", + "from azureml.core import Run\n", + "import argparse\n", + "import os\n", + "\n", + "# Get the experiment run context\n", + "run = Run.get_context()\n", + "\n", + "# Get arguments\n", + "parser = argparse.ArgumentParser()\n", + "parser.add_argument('--raw-ds', type=str, dest='raw_dataset_id')\n", + "parser.add_argument('--out_folder', type=str, dest='folder')\n", + "args = parser.parse_args()\n", + "output_folder = args.folder\n", + "\n", + "# Get input dataset as dataframe\n", + "raw_df = run.input_datasets['raw_data'].to_pandas_dataframe()\n", + "\n", + "# code to prep data (in this case, just select specific columns)\n", + "prepped_df = raw_df[['col1', 'col2', 'col3']]\n", + "\n", + "# Save prepped data to the PipelineData location\n", + "os.makedirs(output_folder, exist_ok=True)\n", + "output_path = os.path.join(output_folder, 'prepped_data.csv')\n", + "prepped_df.to_csv(output_path)" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + } + } + ], + "metadata": { + "kernelspec": { + "name": "python3-azureml", + "language": "python", + "display_name": "Python 3.6 - AzureML" + }, + "language_info": { + "name": "python", + "version": "3.6.9", + "mimetype": "text/x-python", + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "pygments_lexer": "ipython3", + "nbconvert_exporter": "python", + "file_extension": ".py" + }, + "kernel_info": { + "name": "python3-azureml" + }, + "microsoft": { + "host": { + "AzureML": { + "notebookHasBeenCompleted": true + } + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} \ No newline at end of file -- GitLab