Skip to content
Snippets Groups Projects
Commit 3bd319d0 authored by Carlos de Lannoy's avatar Carlos de Lannoy
Browse files

multiprocessing validation, misc

parent dbe6a5d7
No related branches found
No related tags found
No related merge requests found
......@@ -148,6 +148,16 @@ db_dir = ('--db-dir', {
# --- parameters ---
store_example_reads = ('--store-example-reads',{
'action': 'store_true',
'help': 'Additionally store reads as npzs'
})
silent = ('--silent', {
'action': 'store_true',
'help': 'Run without printing to console.'
})
cores = ('--cores', {
'type': int,
'default': 4,
......@@ -231,7 +241,7 @@ def get_build_db_parser():
parser = argparse.ArgumentParser(description='Create ZODB database of training reads from resquiggled fast5s, '
'for given target k-mer')
for arg in (fast5_in, db_dir, normalization, target, width, hdf_path,
uncenter_kmer, read_index, db_type):
uncenter_kmer, read_index, db_type, silent, store_example_reads):
parser.add_argument(arg[0], **arg[1])
return parser
......
......@@ -43,15 +43,12 @@ def main(args):
kmer_size=kmer_size)
db.add_training_read(training_read=tr,
uncenter_kmer=args.uncenter_kmer)
# except ValueError as e:
# with open(error_fn, 'a') as efn:
# efn.write('{fn}\t{err}\n'.format(err=e, fn=basename(file)))
# continue
np.savez(npz_path + splitext(basename(file))[0], base_labels=tr.events, raw=tr.raw)
if args.store_example_reads:
np.savez(npz_path + splitext(basename(file))[0], base_labels=tr.events, raw=tr.raw)
if not i+1 % 10: # Every 10 reads remove history of transactions ('pack' the database) to reduce size
db.pack_db()
percentage_processed = int( (i+1) / nb_files * 100)
if percentage_processed > count_pct_lim:
if not args.silent and percentage_processed >= count_pct_lim:
print(f'{percentage_processed}% of reads processed, {db.nb_pos} positives in DB')
count_pct_lim += 5
except Exception as e:
......
import os, sys
import os, sys, re
import pandas as pd
from os.path import basename
from shutil import copy
......@@ -6,16 +6,37 @@ from pathlib import Path
from distutils.dir_util import copy_tree
from sklearn.model_selection import StratifiedKFold
from jinja2 import Template
from datetime import datetime
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
baseless_location = os.path.realpath(f'{__location__}/..')
sys.path.append(baseless_location)
import numpy as np
from argparse_dicts import get_validate_parser
from helper_functions import parse_input_path, parse_output_path
from snakemake import snakemake
import yaml
from tempfile import TemporaryDirectory
import multiprocessing as mp
def parse_fast5_list(fast5_list, gt_df, out_queue):
fast5_basename_list = [basename(f) for f in fast5_list]
fast5_df = pd.DataFrame({'read_id': fast5_basename_list,
'fn': fast5_list,
'species': [gt_df.species_short.get(ff, 'unknown') for ff in fast5_basename_list]}
).set_index('read_id')
fast5_df.drop(fast5_df.query('species == "unknown"').index, axis=0, inplace=True)
out_queue.put(fast5_df)
def write_test_reads(fast5_sub_df, nb_folds, test_read_dir, species_list):
for nf in range(nb_folds):
cur_fold_df = fast5_sub_df.query(f'fold_{nf} == False')
for _, fn in cur_fold_df.fn.iteritems():
for sp in species_list:
copy(fn, f'{test_read_dir}{sp}/fold_{nf}/')
parser = get_validate_parser()
args = parser.parse_args()
......@@ -42,34 +63,68 @@ gt_df = pd.read_csv(args.ground_truth_16s, header=0)
gt_df.columns = [cn.replace(' ', '_') for cn in gt_df.columns]
gt_df.set_index('file_name', inplace=True)
print(f'{datetime.now()}: loading fast5 file list...')
read_index_dir = parse_output_path(f'{args.out_dir}read_index_files')
fast5_list = parse_input_path(args.fast5_in, pattern='*.fast5')
fast5_basename_list = [basename(f) for f in fast5_list]
fast5_df = pd.DataFrame({'read_id': fast5_basename_list,
'fn': fast5_list,
'species': [gt_df.species_short.get(ff, 'unknown') for ff in fast5_basename_list]}
).set_index('read_id')
fast5_df.drop(fast5_df.query('species == "unknown"').index, axis=0, inplace=True)
fast5_list = np.array(parse_input_path(args.fast5_in, pattern='*.fast5'))
nb_fast5 = len(fast5_list)
ff_idx_list = np.array_split(np.arange(len(fast5_list)), args.cores)
print(f'{datetime.now()}: parsing fast5 file list...')
out_queue = mp.Queue()
ff_workers = [mp.Process(target=parse_fast5_list,
args=(fast5_list[ff_idx], gt_df, out_queue)) for ff_idx in ff_idx_list]
for worker in ff_workers:
tst = worker.start()
df_list = []
while any(p.is_alive() for p in ff_workers):
while not out_queue.empty():
df_list.append(out_queue.get())
fast5_df = pd.concat(df_list)
print(f'{datetime.now()}: done')
# --- compile species list ---
species_list = []
with open(args.target_16S, 'r') as fh:
for line in fh.readlines():
if gen := re.search('(?<=>)[^,]+', line): species_list.append(gen.group(0))
species_list = list(set(species_list))
# --- make folders with test reads (inference consumes them, so separately for each species) ---
species_list = list(fast5_df.species.unique())
test_read_dir = parse_output_path(f'{args.out_dir}test_reads/')
with TemporaryDirectory() as td:
for fi, (train_num_idx, _) in enumerate(StratifiedKFold(n_splits=args.nb_folds, shuffle=True).split(fast5_df.index,
fast5_df.species)):
train_idx = fast5_df.index[train_num_idx]
test_idx = fast5_df.index.difference(train_idx)
cur_test_read_dir = parse_output_path(f'{td}/fold_{fi}/')
for _, fn in fast5_df.loc[test_idx, 'fn'].iteritems(): copy(fn, cur_test_read_dir)
fast5_df.loc[:, 'fold'] = False
fast5_df.loc[train_idx, 'fold'] = True
fast5_df.loc[:, ['fn', 'species', 'fold']].to_csv(f'{read_index_dir}index_fold{fi}.csv')
for sp in species_list:
copy_tree(td, f'{test_read_dir}{sp}')
species_reads_list = list(fast5_df.species.unique())
for sp in species_list:
if sp not in species_reads_list:
raise ValueError(f'No reads for target species {sp} listed in ground truth file!')
test_read_dir = parse_output_path(f'{args.out_dir}test_reads/', clean=True)
for fi, (train_num_idx, _) in enumerate(StratifiedKFold(n_splits=args.nb_folds,
shuffle=True).split(fast5_df.index,
fast5_df.species)):
for sp in species_list:
_ = parse_output_path(f'{test_read_dir}{sp}/fold_{fi}')
train_idx = fast5_df.index[train_num_idx]
fast5_df.loc[:, f'fold_{fi}'] = False
fast5_df.loc[train_idx, f'fold_{fi}'] = True
fast5_df.to_csv(f'{read_index_dir}index_fold{fi}.csv',
columns=['fn', 'species', f'fold_{fi}'],
header=['fn', 'species', f'fold'])
print(f'{datetime.now()}: writing test read dirs...')
ff_idx_list = np.array_split(np.arange(len(fast5_df)), args.cores)
workers = [mp.Process(target=write_test_reads, args=(fast5_df.iloc[cidx, :], args.nb_folds, test_read_dir, species_list))
for cidx in ff_idx_list]
for worker in workers:
tst = worker.start()
for worker in workers:
worker.join()
# print(f'{datetime.now()}: writing remaining test read dirs...')
# if len(species_list) > 1:
# for sp in species_list[1:]:
#
# copy_tree(test_read_dir, f'{test_read_dir}{sp}')
print(f'{datetime.now()}: done')
# --- compile snakefile and run ---
with open(f'{__location__}/validate_16S.sf', 'r') as fh: template_txt = fh.read()
......
......@@ -131,6 +131,7 @@ rule generate_training_db:
--target {wildcards.target} \
--width {filter_width} \
--hdf-path {hdf_path} \
--silent \
{% if uncenter_kmer %}
--uncenter-kmer \
{% endif %}
......@@ -156,6 +157,7 @@ rule generate_test_db:
--target {wildcards.target} \
--width {filter_width} \
--hdf-path {hdf_path} \
--silent \
{% if uncenter_kmer %}
--uncenter-kmer \
{% endif %}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment