Commit 3be2d54e authored by Siu, Pui Chung's avatar Siu, Pui Chung
Browse files

Upload New File

parent 53912060
#!/usr/bin/env python3
"""
Author: Jacky Siu Pui Chung Student number = 1047527
Description: this is a script to algorithm in bioinformatics assignment
4
Usage: The functions below consist of the HMM model based on aligned
fasta sequences, and it generates a sample sequence based on the
parameters trained on.
"""
# Import statements
from sys import argv
import random
# Function definitions
# Background amino acid probabilities
pa = { 'A':0.074, 'C':0.025, 'D':0.054, 'E':0.054, 'F':0.047, 'G':0.074,\
'H':0.026, 'I':0.068, 'L':0.099, 'K':0.058, 'M':0.025, 'N':0.045,\
'P':0.039, 'Q':0.034, 'R':0.052, 'S':0.057, 'T':0.051, 'V':0.073,\
'W':0.013, 'Y':0.034 }
class HMM():
"""HMM object to store an HMM model
This object is designed to keep track of all HMM states, emissions, and
transitions. It may be used in your implementation, but may also be
ignored, and replaced by a data structure of choice
"""
# Emission probabilities for the match and insert states
e_m = []; e_i = pa;
# Transition probabilities from/to matches, inserts and deletions
t_mm = []; t_mi = []; t_md = [];
t_im = []; t_ii = []; t_id = [];
t_dm = []; t_di = []; t_dd = [];
def __init__(self,nmatches):
"""Initialize HMM object with number of match states
nmatches: int, number of match states
"""
self.nmatches = nmatches
#emission prob for match is a list of dictionaries with
#len(nmatches), where each e_m's pa's values = 0.0
self.e_m = [dict(pa) for i in range(0,nmatches)]
for i in range(0,nmatches):
for j in pa.keys():
self.e_m[i][j] = 0.0
#emission probabilities for insert states = pa
self.e_i = pa;
self.t_mm = [0.0 for i in range(0,nmatches+1)]
self.t_mi = [0.0 for i in range(0,nmatches+1)]
self.t_md = [0.0 for i in range(0,nmatches+1)]
self.t_im = [0.0 for i in range(0,nmatches+1)]
self.t_ii = [0.0 for i in range(0,nmatches+1)]
self.t_id = [0.0 for i in range(0,nmatches+1)]
self.t_dm = [0.0 for i in range(0,nmatches+1)]
self.t_di = [0.0 for i in range(0,nmatches+1)]
self.t_dd = [0.0 for i in range(0,nmatches+1)]
def sample(events):
"""Return a key from dict based on the probabilities
events: dict of {key: probability}, sum of probabilities should be 1.0.
"""
key_options = list(events.keys())
cum = [0 for i in key_options]
cum[0] = events[key_options[0]]
for i in range(1,len(events)):
cum[i] = cum[i-1] + events[key_options[i]]
# Should not be necessary, but for safety
cum[len(cum)-1] = 1.0
ref_point = random()
pick = None
i = 0
while pick is None and (i < len(cum)):
if ref_point < cum[i]:
pick = key_options[i]
i = i + 1
return pick
def fastaparser(filetext):
"""
Description: parse fastafile into fastalist
Input:list of file text
Output:list of list that contain ID and fasta sequence
"""
ID, seq, fastas = None, [], []
for line in filetext:
line = line.strip()
if line.startswith(">"):
if ID:
fastas.append([ID, ''.join(seq)])
ID, seq = line, []
else:
seq.append(line)
if ID:
fastas.append([ID, ''.join(seq)])
return fastas
def estmatchstate(fastas):
"""
Description: estimatee number of match states, where Positions in
the alignment that have gaps in less than 50% of the rows correspond to
match states. Those with more than 50% gaps correspond to
insertion states
Input:list of fasta sequences
Output:nmatches, int of number of match states
"""
states = []
nmatches = 0
transposed_fastas = list(map(list, zip(*fastas)))
for position in transposed_fastas:
gaps = 0
for pos_fasta in position:
if pos_fasta == "-":
gaps += 1
#determine each AA position is a match or insert state
if gaps / len(position) < 0.5:
nmatches += 1
states.append("M")
else:
states.append("I")
return nmatches, states
def estnumtransition_emission(fastas,states, matchpos):
"""
Description: estimate the transition and emission probabilities of
certain amino acid position within the aligned fasta
sequences. The function also account for beginning state
which acts as between beginning state and M0
Input:fastas, list of fasta sequences
states: list of strings of symbol for either insert or
match state
matchpos: "B" for beginning state, or a int indicating the
match state number
Output:transition: transition probability at matchpos
emission: emission probabilitity at matchpos
"""
transition = {"t_mm":0, "t_mi":0, "t_md":0, \
"t_im":0, "t_ii":0, "t_id":0, \
"t_dm":0, "t_di":0, "t_dd":0}
emission = {'A':0, 'C':0, 'D':0, 'E':0, 'F':0, 'G':0,\
'H':0, 'I':0., 'L':0, 'K':0, 'M':0, 'N':0,\
'P':0, 'Q':0, 'R':0, 'S':0, 'T':0, 'V':0,\
'W':0, 'Y':0}
matchposlist = [[i,n] for i, n in enumerate(states) if n == "M"]
#when matchpos at begin state
if matchpos == "B":
currmatchpos = 0 #B, artifical, does not contain anything
nextmatchpos = matchposlist[0][0] #M0
#if there are insert states before M0
if "I" in states[currmatchpos:nextmatchpos]:
insertstatebool = True #insert states before first match state
for fasta in fastas:
inserts = [insert for insert in \
fasta[currmatchpos:nextmatchpos] if insert != "-"]
if inserts != []:
transition["t_mi"] += 1
transition["t_im"] += 1
if len(inserts) > 1:
transition["t_ii"] += len(inserts) - 1
else:
# if no insert state before M0, consider between B to M0
if fasta[currmatchpos] != "-":
emission[fasta[currmatchpos]] += 1
if fasta[currmatchpos] == "-":
if fasta[nextmatchpos] in emission.keys():
transition["t_mm"] += 1
if fasta[nextmatchpos] == "-":
transition["t_md"] += 1
return transition, emission
else:
#no insert state before match state
return None, None
#when at last match state
elif matchpos == matchposlist.index(matchposlist[-1]):
currmatchpos = matchposlist[matchpos][0]
#if there are insert states after last match state
if "I" in states[currmatchpos:]:
insertstatebool = True
for fasta in fastas:
if insertstatebool:
endofinsert = len(fasta)
inserts = [insert for insert in \
fasta[currmatchpos:endofinsert] if insert != "-"]
if inserts != []:
if fasta[currmatchpos] == "-":
transition["t_di"] += 1
if fasta[currmatchpos] in emission.keys():
transition["t_mi"] += 1
if len(inserts) > 1:
transition["t_ii"] += len(inserts) - 1
else:
if fasta[currmatchpos] != "-":
emission[fasta[currmatchpos]] += 1
if fasta[currmatchpos] in emission.keys():
transition["t_mm"] += 1
if fasta[currmatchpos] == "-":
transition["t_md"] += 1
return transition, emission
#when no insert state after last match state
else:
for fasta in fastas:
if fasta[currmatchpos] != "-":
emission[fasta[currmatchpos]] += 1
if fasta[currmatchpos] in emission.keys():
transition["t_mm"] += 1
if fasta[currmatchpos] == "-":
transition["t_md"] += 1
return transition, emission
#when matchpos is within (potential insert before M0), M0 to M_end - 1
else:
currmatchpos = matchposlist[matchpos][0]
nextmatchpos = matchposlist[matchpos + 1][0]
insertstatebool = False
#determine whether insert states are present between curr and next
#match state
if "I" in states[currmatchpos:nextmatchpos]:
insertstatebool = True
for fasta in fastas:
if fasta[currmatchpos] != "-":
emission[fasta[currmatchpos]] += 1
if insertstatebool:
inserts = [insert for insert in \
fasta[currmatchpos + 1:nextmatchpos] if insert != "-"]
if inserts != []:
if fasta[currmatchpos] == "-":
transition["t_di"] += 1
if fasta[currmatchpos] in emission.keys():
transition["t_mi"] += 1
if fasta[nextmatchpos] == "-":
transition["t_id"] += 1
if fasta[nextmatchpos] in emission.keys():
transition["t_im"] += 1
if len(inserts) > 1:
transition["t_ii"] += len(inserts) - 1
else:
if fasta[currmatchpos] in emission.keys():
if fasta[nextmatchpos] in emission.keys():
transition["t_mm"] += 1
elif fasta[nextmatchpos] == "-":
transition["t_md"] += 1
if fasta[currmatchpos] == "-":
if fasta[nextmatchpos] in emission.keys():
transition["t_dm"] += 1
elif fasta[nextmatchpos] == "-":
transition["t_dd"] += 1
else:
if fasta[currmatchpos] in emission.keys():
if fasta[nextmatchpos] in emission.keys():
transition["t_mm"] += 1
elif fasta[nextmatchpos] == "-":
transition["t_md"] += 1
if fasta[currmatchpos] == "-":
if fasta[nextmatchpos] in emission.keys():
transition["t_dm"] += 1
elif fasta[nextmatchpos] == "-":
transition["t_dd"] += 1
return transition, emission
def normalization(transition, emission):
"""
Description: normalization of transition and emission probabilities
so that sum of probabilities = 1
Input:transition: transition probability at position
emission: emission probabilitity at position
Output:transition: normalized transition probability at position
emission: normalized emission probabilitity at position
"""
M_list = ["t_mm", "t_mi", "t_md"]
I_list = ["t_im", "t_ii", "t_id"]
D_list = ["t_dm", "t_di", "t_dd"]
M_total, I_total, D_total = 0, 0, 0
#calculate total occurance in M, I and D list
for i in range(len(M_list)):
M_total += transition[M_list[i]]
I_total += transition[I_list[i]]
D_total += transition[D_list[i]]
for i in range(len(M_list)):
#bypass division by zero error when no occurance within list
transition[M_list[i]] /= M_total or not M_total
transition[I_list[i]] /= I_total or not I_total
transition[D_list[i]] /= D_total or not D_total
emission_total = sum([count for count in emission.values()])
#another way to filter zero emission_total
if emission_total != 0:
emission = {k: v / emission_total for (k, v) in emission.items()}
return transition, emission
def HMMwrapper(fastas):
"""
Description: wrapper that connect the sampling of fasta sequences
base on the HMM model
Input:fastas, list of aligned fasta sequences
Output:timepts, list of list, [transition, emission] at each AA position
fasta_sample, a string of protein sequence sample generated from the
HMM model
"""
nmatches, states = estmatchstate(fastas)
timepts = []
#initialize with B to M0
transition, emission = estnumtransition_emission(fastas,states, "B")
#when there are insert states between B and M0
if transition and emission != None:
norm_transition, emission = normalization(transition, emission)
timepts.append([transition, emission])
#from M0 to M_end
for i in range(nmatches):
transition, emission = estnumtransition_emission(fastas,states, i)
norm_transition, emission = normalization(transition, emission)
timepts.append([transition, emission])
fasta_sample = samplesequence(timepts, states)
return timepts,fasta_sample
def samplesequence(timepts, states):
"""
Description: sample sequence based on the transition and emission
probabilities of aligned fasta sequences.
Input:timepts: list of list, [transition, emission] at each AA position
states: list of strings of symbol for either insert or
match state
Output: return a string of protein sequence sample generated from the
HMM model
"""
M_list = ["t_mm", "t_mi", "t_md"]
I_list = ["t_im", "t_ii", "t_id"]
D_list = ["t_dm", "t_di", "t_dd"]
sequence = []
timept = 0
transition = timepts[timept][0]
transition_sublist = {k: transition.get(k, None) for k in M_list}
curr_state = (random.choices(list(transition_sublist.keys()), list(transition_sublist.values())))
#when first AA position is a match state
if states[0] == "M":
AA = random.choices(list(timepts[timept][1].keys()), list(timepts[timept][1].values()))
sequence.append(AA)
#while loop to transition between states
while timept < len(timepts) - 1:
if curr_state[0][3] == "m":
timept += 1
transition = timepts[timept][0]
transition_sublist = {k: transition.get(k, None) \
for k in M_list}
curr_state = (random.choices(list(transition_sublist.keys())\
, list(transition_sublist.values())))
AA = random.choices(list(timepts[timept][1].keys()), \
list(timepts[timept][1].values()))
sequence.append(AA)
if curr_state[0][3] == "i":
transition_sublist = {k: transition.get(k, None) \
for k in I_list}
curr_state = (random.choices(list(transition_sublist.keys())\
, list(transition_sublist.values())))
AA = random.choices(list(pa.keys()), list(pa.values()))
sequence.append(AA)
if curr_state[0][3] == "d":
timept += 1
transition = timepts[timept][0]
transition_sublist = {k: transition.get(k, None) \
for k in D_list}
curr_state = (random.choices(list(transition_sublist.keys())\
, list(transition_sublist.values())))
AA = random.choices(list(timepts[timept][1].keys()), \
list(timepts[timept][1].values()))
sequence.append(AA)
return str([''.join(x) for x in sequence])
def PSSM_format_generator(timepts):
headline = " ".join(list(pa.keys()))
print(headline)
for timept in timepts:
V = list(timept[1].values())
print(*V)
if __name__ == "__main__":
# implement main code here
fastalist = fastaparser(open('develop.fasta'))
fastaseq = []
for fastas in fastalist:
fastaseq.append(fastas[1])
develop_timepts, develop_sample = (HMMwrapper(fastaseq))
fastalist_test = fastaparser(open('test.fasta'))
fastaseq_test = []
for fastas in fastalist_test:
fastaseq_test.append(fastas[1])
fastaseq_test_nmatches, fastaseq_test_states = estmatchstate(fastaseq_test)
test_timepts, test_sample = (HMMwrapper(fastaseq_test))
fastalist = fastaparser(open("test_large.fasta"))
fastaseq_test_large = []
for fastas in fastalist:
fastaseq_test_large.append(fastas[1])
test_large_nmatches, test_large_states = estmatchstate(fastaseq_test_large)
test__large_timepts, test_large_sample = (HMMwrapper(fastaseq_test_large))
# Put function calls, print statements etc. to answer the questions here
# When we run your script we should see the answers on screen (or file)
print("Q1")
print(fastaseq_test_nmatches)
print("Q2")
print(test_timepts)
print("Q3")
print(PSSM_format_generator(test_timepts))
print("Q4")
for i in range(10):
test_timepts, test_sample = (HMMwrapper(fastaseq_test))
print(test_sample)
print("Q5")
print(test_large_nmatches)
print(PSSM_format_generator(test__large_timepts))
#TODO: remaining issue with insert states after last match state
for i in range(10):
test__large_timepts, test_large_sample = (HMMwrapper(fastaseq_test_large))
print(test_large_sample)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment