From 6485a110140d2d023223cf861a0d745b32a7bd5b Mon Sep 17 00:00:00 2001 From: Roelofsen <hans.roelofsen@wur.nl> Date: Tue, 21 Feb 2023 18:10:56 +0100 Subject: [PATCH] Updating Azure Batch Script to latest versions --- sample/vs/config_v2023.py | 37 ++ sample/vs/python_quickstart_client.py | 563 ++++++++++++++++++++++++++ 2 files changed, 600 insertions(+) create mode 100644 sample/vs/config_v2023.py create mode 100644 sample/vs/python_quickstart_client.py diff --git a/sample/vs/config_v2023.py b/sample/vs/config_v2023.py new file mode 100644 index 0000000..af08958 --- /dev/null +++ b/sample/vs/config_v2023.py @@ -0,0 +1,37 @@ +# ------------------------------------------------------------------------- +# +# THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, +# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. +# ---------------------------------------------------------------------------------- +# The example companies, organizations, products, domain names, +# e-mail addresses, logos, people, places, and events depicted +# herein are fictitious. No association with any real company, +# organization, product, domain name, email address, logo, person, +# places, or events is intended or should be inferred. +# -------------------------------------------------------------------------- + +# Global constant variables (Azure Storage account/Batch details) + +# import "config.py" in "python_quickstart_client.py " +# Please note that storing the batch and storage account keys in Azure Key Vault +# is a better practice for Production usage. + +""" +Configure Batch and Storage Account credentials +""" + +BATCH_ACCOUNT_NAME = 'viewscapebatch' # Your batch account name +BATCH_ACCOUNT_KEY = 'sJ3KiVMWIE1s1/3FFrZxYZ3uOyfYBT983IAMHFegnhrjGKqFgWvkIdbGU0yBKsdJ5/c4DMeCTNxv8hvkhcsRnQ==' # Your batch account key +BATCH_ACCOUNT_URL = 'https://viewscapebatch.westeurope.batch.azure.com' # Your batch account URL +STORAGE_ACCOUNT_NAME = 'viewscapediag' +STORAGE_ACCOUNT_KEY = 'WaxZFarUCCOvJPdzH36uGroOMYwqOBzpEkk7vudY2K8dhryp0rihBPtNXgrIO8s4xChT8axaDVAWkRBdnbgohQ==' # Your storage account key +STORAGE_ACCOUNT_DOMAIN = 'blob.core.windows.net' # Your storage account blob service domain + +POOL_ID = 'ViewscapePool01' # Your Pool ID +POOL_NODE_COUNT = 2 # Pool node count +POOL_VM_SIZE = 'STANDARD_DS2_V2' # VM Type/Size +JOB_ID = 'Viewscape' # Job ID +STANDARD_OUT_FILE_NAME = 'stdout.txt' # Standard Output file +DEDICATED_POOL_NODE_COUNT = 'auto' # Moet eigenlijk gelijk zijn aan aantal *.ini files +LOW_PRIORITY_POOL_NODE_COUNT = 0 diff --git a/sample/vs/python_quickstart_client.py b/sample/vs/python_quickstart_client.py new file mode 100644 index 0000000..4578a69 --- /dev/null +++ b/sample/vs/python_quickstart_client.py @@ -0,0 +1,563 @@ +# python quickstart client Code Sample +# +# Copyright (c) Microsoft Corporation +# +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +# Download https://learn.microsoft.com/en-us/azure/batch/quick-run-python#download-the-sample +# door HRoelofsen, 21-02-2023 +# Ter vervanging van batch_python_windows.py +# + +""" +Create a pool of nodes to output text files from azure blob storage. +""" +import pathlib + +import pathlib +import datetime +import io +import os +import sys +import time + +from azure.storage.blob import ( + BlobServiceClient, + BlobSasPermissions, + generate_blob_sas, + generate_container_sas, + ContainerSasPermissions, +) +from azure.batch import BatchServiceClient +from azure.batch.batch_auth import SharedKeyCredentials +import azure.batch.models as batchmodels +from azure.core.exceptions import ResourceExistsError + +try: + import config_v2023 +except ModuleNotFoundError: + from sample.vs import config_v2023 as config + +DEFAULT_ENCODING = "utf-8" + + +# Update the Batch and Storage account credential strings in config.py with values +# unique to your accounts. These are used when constructing connection strings +# for the Batch and Storage client objects. + + +def query_yes_no(question: str, default: str = "yes") -> str: + """ + Prompts the user for yes/no input, displaying the specified question text. + + :param str question: The text of the prompt for input. + :param str default: The default if the user hits <ENTER>. Acceptable values + are 'yes', 'no', and None. + :return: 'yes' or 'no' + """ + valid = {"y": "yes", "n": "no"} + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError(f"Invalid default answer: '{default}'") + + choice = default + + while 1: + user_input = input(question + prompt).lower() + if not user_input: + break + try: + choice = valid[user_input[0]] + break + except (KeyError, IndexError): + print("Please respond with 'yes' or 'no' (or 'y' or 'n').\n") + + return choice + + +def print_batch_exception(batch_exception: batchmodels.BatchErrorException): + """ + Prints the contents of the specified Batch exception. + + :param batch_exception: + """ + print("-------------------------------------------") + print("Exception encountered:") + if ( + batch_exception.error + and batch_exception.error.message + and batch_exception.error.message.value + ): + print(batch_exception.error.message.value) + if batch_exception.error.values: + print() + for mesg in batch_exception.error.values: + print(f"{mesg.key}:\t{mesg.value}") + print("-------------------------------------------") + + +def upload_file_to_container( + blob_storage_service_client: BlobServiceClient, container_name: str, file_path: str +) -> batchmodels.ResourceFile: + """ + Uploads a local file to an Azure Blob storage container. + + :param blob_storage_service_client: A blob service client. + :param str container_name: The name of the Azure Blob storage container. + :param str file_path: The local path to the file. + :return: A ResourceFile initialized with a SAS URL appropriate for Batch + tasks. + """ + blob_name = os.path.basename(file_path) + blob_client = blob_storage_service_client.get_blob_client(container_name, blob_name) + + print(f"Uploading file {file_path} to container [{container_name}]...") + + with open(file_path, "rb") as data: + blob_client.upload_blob(data, overwrite=True) + + sas_token = generate_blob_sas( + account_name=config.STORAGE_ACCOUNT_NAME, + container_name=container_name, + blob_name=blob_name, + account_key=config.STORAGE_ACCOUNT_KEY, + permission=BlobSasPermissions(read=True), + expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2), + ) + + sas_url = generate_sas_url( + config.STORAGE_ACCOUNT_NAME, + config.STORAGE_ACCOUNT_DOMAIN, + container_name, + blob_name, + sas_token, + ) + + return batchmodels.ResourceFile(http_url=sas_url, file_path=blob_name) + + +def generate_sas_url( + account_name: str, + account_domain: str, + container_name: str, + blob_name: str, + sas_token: str, +) -> str: + """ + Generates and returns a sas url for accessing blob storage + """ + + return f"https://{account_name}.{account_domain}/{container_name}/{blob_name}?{sas_token}" + + +def generate_container_sas_url( + container_name: str, + sas_token: str, +): + """ " + Return the URL to a container with a SAS token + """ + return f"https://{config.STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{container_name}?{sas_token}" + + +def create_pool(batch_service_client: BatchServiceClient, pool_id: str): + """ + Creates a pool of compute nodes with the specified OS settings. + + :param batch_service_client: A Batch service client. + :param str pool_id: An ID for the new pool. + :param str publisher: Marketplace image publisher + :param str offer: Marketplace image offer + :param str sku: Marketplace image sku + """ + print(f"Creating pool [{pool_id}]...") + + # Create a new pool of Linux compute nodes using an Azure Virtual Machines + # Marketplace image. For more information about creating pools of Linux + # nodes, see: + # https://azure.microsoft.com/documentation/articles/batch-linux-nodes/ + new_pool = batchmodels.PoolAddParameter( + id=pool_id, + virtual_machine_configuration=batchmodels.VirtualMachineConfiguration( + image_reference=batchmodels.ImageReference( + publisher="canonical", + offer="0001-com-ubuntu-server-focal", + sku="20_04-lts", + version="latest", + ), + node_agent_sku_id="batch.node.ubuntu 20.04", + ), + vm_size=config.POOL_VM_SIZE, + target_dedicated_nodes=config.POOL_NODE_COUNT, + ) + batch_service_client.pool.add(new_pool) + + +def create_job(batch_service_client: BatchServiceClient, job_id: str, pool_id: str): + """ + Creates a job with the specified ID, associated with the specified pool. + + :param batch_service_client: A Batch service client. + :param str job_id: The ID for the job. + :param str pool_id: The ID for the pool. + """ + print(f"Creating job [{job_id}]...") + + job = batchmodels.JobAddParameter( + id=job_id, pool_info=batchmodels.PoolInformation(pool_id=pool_id) + ) + + batch_service_client.job.add(job) + + +def add_tasks( + batch_service_client: BatchServiceClient, + job_id: str, + run_files: list, + output_container_sas_url: str, + output_id: str, +): + """ + Adds a task for each input file in the collection to the specified job. + + :param batch_service_client: A Batch service client. + :param str job_id: The ID of the job to which to add the tasks. + :param list run_files: A collection of input files. One task will be + created for each input file. + :param output_container_sas_token: A SAS token granting write access to + the specified Azure Blob storage container. + :param output_id: str identifyer in output file names + """ + + print(f"Adding {run_files} tasks to job [{job_id}]...") + + tasks = [] + + for idx, input_file in enumerate(run_files): + input_file_path = input_file.file_path + output_file_path = "VSOutFile{}.7z".format(idx) + nr_output_id = "{}_{:03}".format(output_id, idx) + + command = "cmd /c task.bat {} {} {}".format( + input_file_path, nr_output_id, output_file_path + ) + tasks.append( + batchmodels.TaskAddParameter( + id=f"Task{idx}", + command_line=command, + resource_files=[input_file], + output_files=[ + batchmodels.OutputFile( + file_pattern=output_file_path, + destination=batchmodels.OutputFileDestination( + container=batchmodels.OutputFileBlobContainerDestination( + container_url=output_container_sas_url + ) + ), + upload_options=batchmodels.OutputFileUploadOptions( + upload_condition=batchmodels.OutputFileUploadCondition.task_success + ), + ) + ], + ) + ) + + batch_service_client.task.add_collection(job_id, tasks) + + +def wait_for_tasks_to_complete( + batch_service_client: BatchServiceClient, job_id: str, timeout: datetime.timedelta +): + """ + Returns when all tasks in the specified job reach the Completed state. + + :param batch_service_client: A Batch service client. + :param job_id: The id of the job whose tasks should be to monitored. + :param timeout: The duration to wait for task completion. If all + tasks in the specified job do not reach Completed state within this time + period, an exception will be raised. + """ + timeout_expiration = datetime.datetime.now() + timeout + + print( + f"Monitoring all tasks for 'Completed' state, timeout in {timeout}...", end="" + ) + + while datetime.datetime.now() < timeout_expiration: + print(".", end="") + sys.stdout.flush() + tasks = batch_service_client.task.list(job_id) + + incomplete_tasks = [ + task for task in tasks if task.state != batchmodels.TaskState.completed + ] + if not incomplete_tasks: + print() + return True + + time.sleep(1) + + print() + raise RuntimeError( + "ERROR: Tasks did not reach 'Completed' state within " + "timeout period of " + str(timeout) + ) + + +def print_task_output( + batch_service_client: BatchServiceClient, job_id: str, text_encoding: str = None +): + """ + Prints the stdout.txt file for each task in the job. + + :param batch_client: The batch client to use. + :param str job_id: The id of the job with task output files to print. + """ + + print("Printing task output...") + + tasks = batch_service_client.task.list(job_id) + + for task in tasks: + + node_id = batch_service_client.task.get(job_id, task.id).node_info.node_id + print(f"Task: {task.id}") + print(f"Node: {node_id}") + + stream = batch_service_client.file.get_from_task( + job_id, task.id, config.STANDARD_OUT_FILE_NAME + ) + + file_text = _read_stream_as_string(stream, text_encoding) + + if text_encoding is None: + text_encoding = DEFAULT_ENCODING + + sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding=text_encoding) + sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding=text_encoding) + + print("Standard output:") + print(file_text) + + +def _read_stream_as_string(stream, encoding) -> str: + """ + Read stream as string + + :param stream: input stream generator + :param str encoding: The encoding of the file. The default is utf-8. + :return: The file content. + """ + output = io.BytesIO() + try: + for data in stream: + output.write(data) + if encoding is None: + encoding = DEFAULT_ENCODING + return output.getvalue().decode(encoding) + finally: + output.close() + + +if __name__ == "__main__": + + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "--time", help="time out period in minutes", type=int, default=30 + ) + parser.add_argument("--id", help="job ID", type=str, default="Viewscape") + parser.add_argument( + "--output_id", help="output file identifier", type=str, default="VSOutFile" + ) + parser.add_argument( + "--rundir", help="directory with runfile *.ini", type=str, default="./RunFiles" + ) + parser.add_argument( + "--remove", + help="remove batch pool, job and blob containers", + action="store_true", + ) + args = parser.parse_args() + + start_time = datetime.datetime.now().replace(microsecond=0) + print(f"Sample start: {start_time}") + print() + + # Create the blob client, for use in obtaining references to + # blob storage containers and uploading files to containers. + blob_service_client = BlobServiceClient( + account_url=f"https://{config.STORAGE_ACCOUNT_NAME}.{config.STORAGE_ACCOUNT_DOMAIN}/", + credential=config.STORAGE_ACCOUNT_KEY, + ) + + # Use the blob client to create the containers in Azure Storage if they + # don't yet exist. + application_container_name = "application" + run_container_name = "run" + data_container_name = "data" + output_container_name = "output" + # input_container_name = 'input' # pylint: disable=invalid-name + try: + blob_service_client.create_container(application_container_name) + blob_service_client.create_container(run_container_name) + blob_service_client.create_container(data_container_name) + blob_service_client.create_container(output_container_name) + except ResourceExistsError: + pass + + # HDR-2023-02-21 + # Create list of application files in the ApplicationFiles dir + base_dir = pathlib.Path(__file__).parents[0] + application_file_paths = [] + for folder, subs, files in os.walk(os.path.join(base_dir, "ApplicationFiles")): + for filename in files: + application_file_paths.append( + os.path.abspath(os.path.join(folder, filename)) + ) + + # Create a list of all input files in the RunFiles directory. + run_file_paths = [] + for folder, subs, files in os.walk(args.rundir): + for filename in files: + if filename.endswith(".ini") and filename.startswith("ViewScape"): + run_file_paths.append(os.path.abspath(os.path.join(folder, filename))) + + # Update the pool node count to match nr of run files + if config.DEDICATED_POOL_NODE_COUNT == "auto": + setattr(config, "DEDICATED_POOL_NODE_COUNT", len(run_file_paths)) + else: + pass + + # Create a list of all application files in the ApplicationFiles directory. + data_file_paths = [] + for folder, subs, files in os.walk(os.path.join(base_dir, "DataFiles")): + for filename in files: + data_file_paths.append(os.path.abspath(os.path.join(folder, filename))) + + # Upload the application files. This is the collection of files that are needed to run the application. + application_files = [ + upload_file_to_container( + blob_service_client, application_container_name, file_path + ) + for file_path in application_file_paths + ] + + # Upload the run files. This is the collection of files that are to be processed by the tasks. + run_files = [ + upload_file_to_container(blob_service_client, run_container_name, file_path) + for file_path in run_file_paths + ] + + # Upload the data files. This is the collection of files that are needed to run the application. + data_files = [ + upload_file_to_container( + blob_service_client, application_container_name, file_path + ) + for file_path in data_file_paths + ] + + # Obtain a shared access signature URL that provides write access to the output + # container to which the tasks will upload their output. + output_container_sas_url = generate_container_sas_url( + container_name=output_container_name, + sas_token=generate_container_sas( + account_name=config.STORAGE_ACCOUNT_NAME, + account_key=config.STORAGE_ACCOUNT_KEY, + container_name=output_container_name, + permission=ContainerSasPermissions(write=True), + ), + ) + + # Create a Batch service client. We'll now be interacting with the Batch + # service in addition to Storage + credentials = SharedKeyCredentials( + config.BATCH_ACCOUNT_NAME, config.BATCH_ACCOUNT_KEY + ) + + batch_client = BatchServiceClient(credentials, batch_url=config.BATCH_ACCOUNT_URL) + + try: + # Create the pool that will contain the compute nodes that will execute the + # tasks. + create_pool(batch_client, config.POOL_ID) + + # Create the job that will run the tasks. + create_job(batch_client, config.JOB_ID, config.POOL_ID) + + # Add the tasks to the job. + add_tasks( + batch_service_client=batch_client, + job_id=config.JOB_ID, + run_files=run_files, + output_container_sas_url=output_container_sas_url, + output_id=args.output_id, + ) + + # Pause execution until tasks reach Completed state. + wait_for_tasks_to_complete( + batch_client, config.JOB_ID, datetime.timedelta(minutes=args.time) + ) + + print( + " Success! All tasks reached the 'Completed' state within the " + "specified timeout period." + ) + + # Print the stdout.txt and stderr.txt files for each task to the console + print_task_output(batch_client, config.JOB_ID) + + # Print out some timing info + end_time = datetime.datetime.now().replace(microsecond=0) + print() + print(f"Sample end: {end_time}") + elapsed_time = end_time - start_time + print(f"Elapsed time: {elapsed_time}") + print() + input("Press ENTER to exit...") + + except batchmodels.BatchErrorException as err: + print_batch_exception(err) + raise + + finally: + # Clean up storage resources + for container in [ + application_container_name, + run_container_name, + data_container_name, + output_container_name, + ]: + print(f"Deleting container [{container}]...") + blob_service_client.delete_container(container) + + # Clean up Batch resources (if the user so chooses). + if args.remove: + batch_client.job.delete(config.JOB_ID) + batch_client.pool.delete(config.POOL_ID) -- GitLab