Skip to content
Snippets Groups Projects
Commit b6a1b784 authored by Koehorst, Jasper's avatar Koehorst, Jasper
Browse files

entrez downloader and xml parser added

parent 8047c199
No related branches found
No related tags found
No related merge requests found
import os
import sys
from Bio import Entrez
from joblib import Parallel, delayed
from joblib.externals.loky.process_executor import TerminatedWorkerError
def processInput(identifier):
identifier = str(identifier)
folder = make_folder(identifier)
path = folder + "/" + identifier + ".xml"
if os.path.isfile(path):
# print("Already exists")
return path
Entrez.email = "ja@nee.com"
handle = Entrez.efetch(db="sra", id=identifier, rettype="xml", retmode="text")
content = handle.read()
print(path)
with open(path, 'w') as f:
print(content, file=f)
return path
def make_folder(identifier):
folder = ""
print(identifier)
# print(identifier)
for e in identifier:
if len(folder) % 4 == 0:
folder = folder + "/"
......@@ -13,28 +38,41 @@ def make_folder(identifier):
return "./sra" + folder
Entrez.email = "A.N.Other@example.com" # Always tell NCBI who you are
handle = Entrez.esearch(db="sra",
term="(((\"paired\"[Layout] AND \"illumina\"[Platform]) AND \"strategy amplicon\"["
"Properties]) AND \"amplicon\"[Strategy] AND \"filetype fastq\"[Properties]) AND ("
"cluster_public[prop] AND \"filetype fastq\"[Properties]) ") #,
#retmax="2000")
record = Entrez.read(handle)
while True:
query = "(((((\"paired\"[Layout]) AND ((\"instrument illumina hiseq 1000\"[Properties] OR \"instrument illumina hiseq 1500\"[Properties] OR \"instrument illumina hiseq 2000\"[Properties] OR \"instrument illumina hiseq 2500\"[Properties] OR \"instrument illumina hiseq 3000\"[Properties] OR \"instrument illumina hiseq 4000\"[Properties] OR \"instrument illumina hiseq x ten\"[Properties] OR \"instrument illumina miseq\"[Properties]))) AND \"illumina\"[Platform]) AND \"amplicon\"[Strategy]) AND \"filetype fastq\"[Properties]) AND \"cluster public\"[Properties]"
Entrez.email = "A.N.Other@example.com" # Always tell NCBI who you are
handle = Entrez.esearch(db="sra",
term=query,
# term="(("
# "("
# "\"paired\"[Layout] AND \"illumina\"[Platform]"
# ") AND \"strategy amplicon\"[Properties]"
# ") AND \"amplicon\"[Strategy] AND \"filetype fastq\"[Properties]"
# ") AND "
# "("
# "cluster_public[prop] AND \"filetype fastq\"[Properties]"
# ") AND "
# "("
# "(\""
# "instrument illumina hiseq 1000\"[Properties] OR \"instrument illumina hiseq 1500\"[Properties] OR \"instrument illumina hiseq 2000\"[Properties] OR \"instrument illumina hiseq 2500\"[Properties] OR \"instrument illumina hiseq 3000\"[Properties] OR \"instrument illumina hiseq 4000\"[Properties] OR \"instrument illumina hiseq x ten\"[Properties] OR \"instrument illumina miseq\"[Properties]"
# "))",
retmax="3000000")
record = Entrez.read(handle)
print(record['IdList'])
identifiers = set()
for index, identifier in enumerate(record['IdList']):
folder = make_folder(identifier)
path = folder + "/" + identifier + ".xml"
for identifier in record['IdList']:
folder = make_folder(identifier)
path = folder + "/" + identifier + ".xml"
if os.path.exists(path): continue
if not os.path.exists(folder):
os.makedirs(folder)
handle = Entrez.efetch(db="sra", id=identifier, rettype="xml", retmode="text")
content = handle.read()
print(path)
with open(path, 'w') as f:
print(content, file=f)
if not os.path.exists(folder):
os.makedirs(folder)
identifiers.add(int(identifier))
# dictionary = xmltodict.parse(content)
# myprint(dictionary)
num_cores = 2
print("Parsing", len(identifiers))
try:
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in identifiers)
# Stop when finished
sys.exit(0)
except TerminatedWorkerError as e:
pass
import os
import xmltodict
# GLOBAL PARAMETERS
MINIMUM_NUMBER_OF_SPOTS = 10000
def myprint(d):
"""
Prints key, value for a nested dictionary
:param d:
"""
for k, v in d.items():
if isinstance(v, dict):
myprint(v)
else:
line = "{0} : {1}".format(k, v)
if "primer" in line:
print(line)
# if "primer" in line:
print(line)
dirName = './sra'
listOfFiles = list()
for (dirpath, dirnames, filenames) in os.walk(dirName):
listOfFiles += [os.path.join(dirpath, file) for file in filenames]
# global single
# global paired
# global error
single = 0
paired = 0
error = 0
machines = set()
illumina_machines = set(["Illumina HiSeq 1000", "Illumina HiSeq 1500", "Illumina HiSeq 2000", "Illumina HiSeq 2500",
"Illumina HiSeq 3000", "Illumina HiSeq 4000", "Illumina HiSeq x ten", "Illumina MiSeq"])
def machine_filter(doc):
"""
Removes all docs that are not of illumina hiseq or miseq
:param doc:
:return:
"""
if 'ILLUMINA' not in doc['EXPERIMENT_PACKAGE_SET']['EXPERIMENT_PACKAGE']['EXPERIMENT']['PLATFORM'].keys():
return False
if doc['EXPERIMENT_PACKAGE_SET']['EXPERIMENT_PACKAGE']['EXPERIMENT']['PLATFORM']['ILLUMINA'][
'INSTRUMENT_MODEL'] not in illumina_machines:
machines.add(doc['EXPERIMENT_PACKAGE_SET']['EXPERIMENT_PACKAGE']['EXPERIMENT']['PLATFORM']['ILLUMINA'][
'INSTRUMENT_MODEL'])
return False
return True
def paired_check(doc):
"""
Check if it contains paired files
:param doc:
:return:
"""
global single
global paired
global error
try:
size = int(doc['EXPERIMENT_PACKAGE_SET']['EXPERIMENT_PACKAGE']['RUN_SET']['RUN']['Statistics']['@nreads'])
if size != 2:
single += 1
return False
paired += 1
return True
except KeyError as e:
error += 1
return False
except TypeError as e:
error += 1
return False
except ValueError as e:
error += 1
return False
def setup_investigation(content):
# # Study contains the Investigation information
# study = content['STUDY']
# investigation_accession = study['@accession']
# investigation_alias = study['@alias']
# investigation_title = study['DESCRIPTOR']['STUDY_TITLE']
# investigation_type = study['DESCRIPTOR']['STUDY_TYPE']['@existing_study_type']
# # OPTIONAL investigation_center_project_name = study['DESCRIPTOR']['STUDY_TYPE']['CENTER_PROJECT_NAME']
# PRJN = INVESTIGATION
# SUBMITTER INFORMATION
submission = content['SUBMISSION']
lab_name = submission['@lab_name']
center_name = submission['@center_name']
submission_accession = submission['@accession']
submission_alias = submission['@alias']
# ORGANIZATION INFORMATION
organization = content['Organization']
email = organization['Contact']['@email']
# organization['Contact']['@sec_email']
name = organization['Contact']['Name']['First'] + " " + organization['Contact']['Name']['Middle'] + " " + \
organization['Contact']['Name']['Last']
firstname = organization['Contact']['Name']['First']
lastname = organization['Contact']['Name']['Last']
postal_code = organization['Contact']['Address']['@postal_code']
department = organization['Contact']['Address']['Department']
institution = organization['Contact']['Address']['Institution']
street = organization['Contact']['Address']['Street']
city = organization['Contact']['Address']['City']
sub = organization['Contact']['Address']['Sub']
country = organization['Contact']['Address']['Country']
investigation = {
}
return investigation
def setup_study(content):
study_object = content['STUDY']
study_accession = study_object['@accession']
study_alias = study_object['@alias']
study_title = study_object['DESCRIPTOR']['STUDY_TITLE']
study_type = None
if '@existing_study_type' in study_object:
study_type = study_object['DESCRIPTOR']['@existing_study_type']
study_center_project_name = study_object['DESCRIPTOR']['CENTER_PROJECT_NAME']
study = {
"accession": study_accession,
"alias": study_alias,
"title": study_title,
"type": study_type,
"study_center_project_name": study_center_project_name
}
return study
def setup_sample(content):
sample_object = content['SAMPLE']
sample_alias = sample_object['@alias']
sample_accession = sample_object['@accession']
external_ids = {}
namespace = sample_object['IDENTIFIERS']['EXTERNAL_ID']['@namespace']
text = sample_object['IDENTIFIERS']['EXTERNAL_ID']['#text']
sample_title = sample_object['TITLE']
sample_taxon = sample_object['SAMPLE_NAME']['TAXON_ID']
sample_scientific_name = sample_object['SAMPLE_NAME']['SCIENTIFIC_NAME']
sample_description = sample_object['DESCRIPTION']
sample_attributes = {}
for sample_attribute in sample_object['SAMPLE_ATTRIBUTES']['SAMPLE_ATTRIBUTE']:
sample_attributes[sample_attribute['TAG']] = sample_attribute['VALUE']
sample = {
'alias': sample_alias,
'accession': sample_accession,
'external_ids': external_ids,
'title': sample_title,
'taxon': sample_taxon,
'scientific_name': sample_scientific_name,
'description': sample_description,
'attributes': sample_attributes,
'external_id': {namespace:text}
}
for element in sample_object['SAMPLE_ATTRIBUTES']['SAMPLE_ATTRIBUTE']:
sample[element['TAG']] = element['VALUE']
return sample
def setup_observation_unit(content):
experiment = content['EXPERIMENT']
ou_accession = experiment['@accession']
ou_alias = experiment['@alias']
ou_title = experiment['TITLE']
study_accession = experiment['STUDY_REF']['@accession']
ou_design_description = experiment['DESIGN']['DESIGN_DESCRIPTION']
observation_unit = {
"accession": ou_accession,
"alias": ou_alias,
"title": ou_title,
"study_id": study_accession,
"description": ou_design_description
}
return observation_unit
def setup_assay(content):
run = content['RUN_SET']['RUN']
accession = run['@accession']
alias = run['@alias']
total_spots = run['@total_spots']
total_bases = run['@total_bases']
size = run['@size']
load_done = run['@load_done']
published = run['@published']
ou_accession = run['EXPERIMENT_REF']['@accession']
# Files
sra_file = run['SRAFiles']['SRAFile']
forward_fastq = sra_file['@filename'] + "_1.fastq.gz"
reverse_fastq = sra_file['@filename'] + "_2.fastq.gz"
date = sra_file['@date']
super_type = sra_file['@supertype']
sratoolkit = sra_file['@sratoolkit']
# Obtain experiment info on design / library
library = content['EXPERIMENT']['DESIGN']['LIBRARY_DESCRIPTOR']
library_name = library['LIBRARY_NAME']
library_strategy = library['LIBRARY_STRATEGY']
library_source = library['LIBRARY_SOURCE']
library_selection = library['LIBRARY_SELECTION']
library_layout = list(dict(library['LIBRARY_LAYOUT']).keys())
assay = {
"accession": accession,
"alias": alias,
"spots": total_spots,
"bases": total_bases,
"size": size,
"load_done": load_done,
"published": published,
"ou_accession": ou_accession,
"forward_fastq": forward_fastq,
"reverse_fastq": reverse_fastq,
"date": date,
"super_type": super_type,
"sratoolkit": sratoolkit,
"library_name": library_name,
"library_strategy": library_strategy,
"library_source": library_source,
"library_selection": library_selection,
"library_layout": library_layout
}
return assay
# OPTIONAL FILTER FOR LENGTH?... run['Statistics']['READ'][0]['@average']
def lookup_creation(doc):
content = doc['EXPERIMENT_PACKAGE_SET']['EXPERIMENT_PACKAGE']
if int(content['RUN_SET']['RUN']['@total_spots']) < MINIMUM_NUMBER_OF_SPOTS:
return None
if content['RUN_SET']['RUN']['Statistics']['@nreads'] != "2":
return None
investigation = setup_investigation(content)
study = setup_study(content)
observation_unit = setup_observation_unit(content)
sample = setup_sample(content)
assay = setup_assay(content)
def main():
for (dirpath, dirnames, filenames) in os.walk(dirName):
listOfFiles = [os.path.join(dirpath, file) for file in filenames]
# listOfFiles = ['./sra/338/257/9/3382579.xml']
for index, elem in enumerate(listOfFiles):
# print("Parsing", index, "of", len(listOfFiles))
# print("Parsing ", elem)
with open(elem) as fd:
content = fd.read()
# if "soil" not in content:
# continue
doc = xmltodict.parse(content)
status = machine_filter(doc)
if not status:
continue
status = paired_check(doc)
if not status:
continue
# Create lookup file
lookup_creation(doc)
# print(paired, single, error, ' '.join(machines))
# break
for elem in listOfFiles:
with open(elem) as fd:
doc = xmltodict.parse(fd.read())
myprint(doc)
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment