From 5bc16ad3926619863dafa322b5d61e140f77f12a Mon Sep 17 00:00:00 2001 From: Jasper Koehorst <> Date: Fri, 28 Jan 2022 09:02:08 +0100 Subject: [PATCH] medaka parallelisation --- medaka/medaka.py | 121 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 medaka/medaka.py diff --git a/medaka/medaka.py b/medaka/medaka.py new file mode 100644 index 0000000..341e7f3 --- /dev/null +++ b/medaka/medaka.py @@ -0,0 +1,121 @@ +# Improvement for the medaka workflow +import subprocess +import argparse +import gzip + +# import multiprocessing +# from joblib import Parallel, delayed + +def main(): + args = parse_arguments() + mini_align(args) + medaka_consensus(args) + + +def parse_arguments(): + # Initialize parser + parser = argparse.ArgumentParser() + + # Adding optional argument + parser.add_argument("-i", "--input", help = "input fast(a/q) file", required=True) + parser.add_argument("-r", "--reference", help = "reference fasta file", required=True) + parser.add_argument("-t", "--threads", help = "Number of threads to use", nargs='?', default=2, type=int) + parser.add_argument("-m", help = "fill MD tag") + parser.add_argument("-p", help = "output file prefix (default: reads)",nargs='?', default="reads", type=str) + parser.add_argument("-c", "--contig-chunks", help = "Number of contigs per medaka_consensus step", nargs='?',default=100, type=int) + parser.add_argument("--model", help = "Model definition, default is equivalent to r94.", default="r94") + # Adding optional argument for .... + # ... + + # Adding optional argument for .... + # ... + + # Read arguments from command line + args = parser.parse_args() + return args + + + +## SINGLE RUN +# # align reads to assembly +# mini_align -i basecalls.fasta -r assembly.fasta -P -m \ +# -p calls_to_draft.bam -t <threads> + +def mini_align(args): + print(args) + arguments = ["mini_align", "-i", args.input, "-r", args.reference, "-m", "-p", args.p, "-t", args.threads] + arguments = ' '.join(map(str,arguments)) + print(arguments) + subprocess.run(arguments, shell=True, check=True) + +## RUN IN PARALLEL +# getconf ARG_MAX # To obtain maximum length + +# # run lots of jobs like this, change model as appropriate +# mkdir results +# medaka consensus calls_to_draft.bam results/contigs1-4.hdf \ +# --model r941_min_fast_g303 --batch 200 --threads 8 \ +# --region contig1 contig2 contig3 contig4 +# ... + +def medaka_consensus(args): + chunks = {} + # Read genome file to obtain contig information + if (args.reference.endswith(".gz")): + reader = gzip.open(args.reference, 'rb') + else: + reader = open(args.reference) + + chunk = 0 + + for line in reader: + line = line.decode('ascii').strip() + if line.startswith(">"): + header = line.strip(">") + if chunk not in chunks: + chunks[chunk] = set() + if len(chunks[chunk]) > args.contig_chunks: + chunk = chunk + 1 + chunks[chunk] = set() + chunks[chunk].add(header) + + if chunk > 10: + print("REMOVE ME!!!") + break + + reader.close() + + # Run this in parallel for each chunk by number of threads / 2 + commands = set() + for chunk in chunks: + arguments = ["medaka", "consensus", args.p, "results/chunk_" + str(chunk), "--model", args.model, "--threads", "2", "--region", ' '.join(chunks[chunk])] + arguments = ' '.join(map(str,arguments)) + commands.add(arguments) + # subprocess.run(arguments, shell=True, check=True) + + # Multi thread section + from functools import partial + from multiprocessing.dummy import Pool + from subprocess import call + + jobs = args.threads / 2 + print("Number of jobs to be executed in parallel:", jobs) + pool = Pool(jobs) # two concurrent commands at a time + for i, returncode in enumerate(pool.imap(partial(call, shell=True), commands)): + if returncode != 0: + print("%d command failed: %d" % (i, returncode)) + else: + print("%d command finished: %d" % (i, returncode)) + + +## SINGLE RUN +# # wait for jobs, then collate results +# medaka stitch results/*.hdf polished.assembly.fasta +def medaka_stich(args): + arguments = ["medaka", "stich", "results/*.hdf", "polished.assembly.fasta"] + print(arguments) + # subprocess.run(arguments, shell=True, check=True) + + +if __name__ == '__main__': + main() \ No newline at end of file -- GitLab