Skip to content
Snippets Groups Projects
Commit 5bc16ad3 authored by Jasper Koehorst's avatar Jasper Koehorst
Browse files

medaka parallelisation

parent 916f5181
No related branches found
No related tags found
No related merge requests found
# Improvement for the medaka workflow
import subprocess
import argparse
import gzip
# import multiprocessing
# from joblib import Parallel, delayed
def main():
args = parse_arguments()
mini_align(args)
medaka_consensus(args)
def parse_arguments():
# Initialize parser
parser = argparse.ArgumentParser()
# Adding optional argument
parser.add_argument("-i", "--input", help = "input fast(a/q) file", required=True)
parser.add_argument("-r", "--reference", help = "reference fasta file", required=True)
parser.add_argument("-t", "--threads", help = "Number of threads to use", nargs='?', default=2, type=int)
parser.add_argument("-m", help = "fill MD tag")
parser.add_argument("-p", help = "output file prefix (default: reads)",nargs='?', default="reads", type=str)
parser.add_argument("-c", "--contig-chunks", help = "Number of contigs per medaka_consensus step", nargs='?',default=100, type=int)
parser.add_argument("--model", help = "Model definition, default is equivalent to r94.", default="r94")
# Adding optional argument for ....
# ...
# Adding optional argument for ....
# ...
# Read arguments from command line
args = parser.parse_args()
return args
## SINGLE RUN
# # align reads to assembly
# mini_align -i basecalls.fasta -r assembly.fasta -P -m \
# -p calls_to_draft.bam -t <threads>
def mini_align(args):
print(args)
arguments = ["mini_align", "-i", args.input, "-r", args.reference, "-m", "-p", args.p, "-t", args.threads]
arguments = ' '.join(map(str,arguments))
print(arguments)
subprocess.run(arguments, shell=True, check=True)
## RUN IN PARALLEL
# getconf ARG_MAX # To obtain maximum length
# # run lots of jobs like this, change model as appropriate
# mkdir results
# medaka consensus calls_to_draft.bam results/contigs1-4.hdf \
# --model r941_min_fast_g303 --batch 200 --threads 8 \
# --region contig1 contig2 contig3 contig4
# ...
def medaka_consensus(args):
chunks = {}
# Read genome file to obtain contig information
if (args.reference.endswith(".gz")):
reader = gzip.open(args.reference, 'rb')
else:
reader = open(args.reference)
chunk = 0
for line in reader:
line = line.decode('ascii').strip()
if line.startswith(">"):
header = line.strip(">")
if chunk not in chunks:
chunks[chunk] = set()
if len(chunks[chunk]) > args.contig_chunks:
chunk = chunk + 1
chunks[chunk] = set()
chunks[chunk].add(header)
if chunk > 10:
print("REMOVE ME!!!")
break
reader.close()
# Run this in parallel for each chunk by number of threads / 2
commands = set()
for chunk in chunks:
arguments = ["medaka", "consensus", args.p, "results/chunk_" + str(chunk), "--model", args.model, "--threads", "2", "--region", ' '.join(chunks[chunk])]
arguments = ' '.join(map(str,arguments))
commands.add(arguments)
# subprocess.run(arguments, shell=True, check=True)
# Multi thread section
from functools import partial
from multiprocessing.dummy import Pool
from subprocess import call
jobs = args.threads / 2
print("Number of jobs to be executed in parallel:", jobs)
pool = Pool(jobs) # two concurrent commands at a time
for i, returncode in enumerate(pool.imap(partial(call, shell=True), commands)):
if returncode != 0:
print("%d command failed: %d" % (i, returncode))
else:
print("%d command finished: %d" % (i, returncode))
## SINGLE RUN
# # wait for jobs, then collate results
# medaka stitch results/*.hdf polished.assembly.fasta
def medaka_stich(args):
arguments = ["medaka", "stich", "results/*.hdf", "polished.assembly.fasta"]
print(arguments)
# subprocess.run(arguments, shell=True, check=True)
if __name__ == '__main__':
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment