Unverified Commit 5df87c45 authored by mehmet's avatar mehmet Committed by GitHub
Browse files

Merge pull request #2 from TurtleTools/dev

dev
parents 36a7c755 41060647
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python3
from caretta import multiple_alignment
from pathlib import Path
import typer
app = typer.Typer()
def input_folder_callback(folder: Path) -> Path:
if not folder.exists():
raise typer.BadParameter(f"Folder {folder} does not exist")
return folder
import fire
from caretta import msa_numba
def output_folder_callback(folder: Path) -> Path:
if folder.exists():
raise typer.BadParameter(
f"Folder {folder} already exists, cowardly refusing to overwrite. Please delete it and try again"
)
return folder
def align(input_pdb,
dssp_dir="caretta_tmp", num_threads=20, extract_all_features=False,
gap_open_penalty=1., gap_extend_penalty=0.01, consensus_weight=1.,
write_fasta=True, output_fasta_filename=Path("./result.fasta"),
write_pdb=True, output_pdb_folder=Path("./result_pdb/"),
write_features=True, output_feature_filename=Path("./result_features.pkl"),
write_class=True, output_class_filename=Path("./result_class.pkl"),
overwrite_dssp=False):
def positive_penalty(value: float) -> float:
if value < 0.0:
raise typer.BadParameter(f"Value {value} must be positive")
return value
@app.command()
def align(
input_pdb: Path = typer.Argument(
..., help="A folder with input protein files", callback=input_folder_callback
),
gap_open_penalty: float = typer.Option(
1.0, "-p", help="gap open penalty", callback=positive_penalty
),
gap_extend_penalty: float = typer.Option(
0.01, "-e", help="gap extend penalty", callback=positive_penalty
),
consensus_weight: float = typer.Option(
1.0,
"--consensus-weight",
"-c",
help="weight well-aligned segments to reduce gaps in these areas",
callback=positive_penalty,
),
full: bool = typer.Option(
False,
"--full",
"-f",
help="Use all vs. all pairwise alignment for distance matrix calculation (much slower)",
),
output: Path = typer.Option(
Path("caretta_results"),
"--output",
"-o",
help="folder to store output files",
callback=output_folder_callback,
),
fasta: bool = typer.Option(True, help="write alignment in FASTA file format"),
pdb: bool = typer.Option(
True, help="write PDB files superposed according to alignment"
),
threads: int = typer.Option(
4, "--threads", "-t", help="number of threads to use for feature extraction"
),
features: bool = typer.Option(
False,
"--features",
help="extract and write aligned features as a dictionary of NumPy arrays into a pickle file",
),
write_class: bool = typer.Option(
False,
"--class",
help="write StructureMultiple class with intermediate structures and tree to pickle file",
),
):
"""
Caretta aligns protein structures and returns a sequence alignment, superposed PDB files, a set of aligned feature matrices, and
a class with intermediate structures made during progressive alignment.
Parameters
----------
input_pdb
Can be \n
A folder with input protein files \n
A file which lists PDB filenames on each line \n
A file which lists PDB IDs on each line \n
dssp_dir
Folder to store temp DSSP files (default caretta_tmp)
num_threads
Number of threads to use for feature extraction
extract_all_features
True => obtains all features (default True) \n
False => only DSSP features (faster)
gap_open_penalty
default 1
gap_extend_penalty
default 0.01
consensus_weight
default 1
write_fasta
True => writes alignment as fasta file (default True)
output_fasta_filename
Fasta file of alignment (default result.fasta)
write_pdb
True => writes all protein PDB files superposed by alignment (default True)
output_pdb_folder
Folder to write superposed PDB files (default result_pdb)
write_features
True => writes aligned features a s a dictionary of numpy arrays into a pickle file (default True)
output_feature_filename
Pickle file to write aligned features (default result_features.pkl)
write_class
True => writes StructureMultiple class with intermediate structures and tree to pickle file (default True)
output_class_filename
Pickle file to write StructureMultiple class (default result_class.pkl)
overwrite_dssp
Forces DSSP to rerun (default False)
Align protein structures using Caretta.
Writes the resulting sequence alignment and superposed PDB files to "caretta_results".
Optionally also outputs a set of aligned feature matrices, or the python class with intermediate structures made during progressive alignment.
"""
msa_numba.StructureMultiple.align_from_pdb_files(input_pdb,
dssp_dir, num_threads, extract_all_features,
gap_open_penalty, gap_extend_penalty, consensus_weight,
write_fasta, output_fasta_filename,
write_pdb, output_pdb_folder,
write_features, output_feature_filename,
write_class, output_class_filename,
overwrite_dssp)
input_pdb = input_folder_callback(input_pdb)
output = output_folder_callback(output)
multiple_alignment.trigger_numba_compilation()
multiple_alignment.StructureMultiple.align_from_pdb_files(
input_pdb,
gap_open_penalty,
gap_extend_penalty,
consensus_weight,
full,
output,
threads,
fasta,
pdb,
features,
write_class,
)
if __name__ == '__main__':
fire.Fire(align)
if __name__ == "__main__":
app()
This diff is collapsed.
import base64
import datetime
import pickle
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import prody as pd
import requests as rq
from caretta import msa_numba
import typing
from caretta import multiple_alignment
def heatmap(data):
return dict(
data=[dict(z=data, type="heatmap", showscale=False)],
layout=dict(margin=dict(l=25, r=25, t=25, b=25)),
)
def empty_dict():
data = [dict(z=np.zeros((2, 2)), type="heatmap", showscale=False)]
layout = dict(margin=dict(l=25, r=25, t=25, b=25))
return dict(data=data, layout=layout)
def empty_object(suite):
return compress_object(np.zeros(0), suite)
def get_estimated_time(msa_class: multiple_alignment.StructureMultiple):
n = len(msa_class.structures)
l = max(s.length for s in msa_class.structures)
func = lambda x, r: (x[0] ** 2 * r * x[1] ** 2)
return str(datetime.timedelta(seconds=int(func((l, n), 9.14726052e-06))))
def line(data):
y = np.array([np.nanmean(data[:, x]) for x in range(data.shape[1])])
y_se = np.array(
[np.nanstd(data[:, x]) / np.sqrt(data.shape[1]) for x in range(data.shape[1])]
)
data = [
dict(
y=list(y + y_se) + list(y - y_se)[::-1],
x=list(range(data.shape[1])) + list(range(data.shape[1]))[::-1],
fillcolor="lightblue",
fill="toself",
type="scatter",
mode="lines",
name="Standard error",
line=dict(color="lightblue"),
),
dict(
y=y,
x=np.arange(data.shape[1]),
type="scatter",
mode="lines",
name="Mean",
line=dict(color="blue"),
),
]
return dict(
data=data,
layout=dict(legend=dict(x=0.5, y=1.2), margin=dict(l=25, r=25, t=25, b=25)),
)
def scatter3D(coordinates_dict):
data = []
for k, v in coordinates_dict.items():
x, y, z = v[:, 0], v[:, 1], v[:, 2]
data.append(
dict(
x=x,
y=y,
z=z,
mode="lines",
type="scatter3d",
text=None,
name=str(k),
line=dict(width=3, opacity=0.8),
)
)
layout = dict(
margin=dict(l=20, r=20, t=20, b=20),
clickmode="event+select",
scene=dict(
xaxis=dict(visible=False, showgrid=False, showline=False),
yaxis=dict(visible=False, showgrid=False, showline=False),
zaxis=dict(visible=False, showgrid=False, showline=False),
),
)
return dict(data=data, layout=layout)
def write_feature_as_tsv(
feature_data: np.ndarray, keys: typing.List[str], file_name: typing.Union[Path, str]
):
with open(file_name, "w") as f:
for i in range(feature_data.shape[0]):
f.write(
"\t".join([keys[i]] + [str(x) for x in list(feature_data[i])]) + "\n"
)
def compress_object(raw_object, suite):
return base64.b64encode(suite.encrypt(pickle.dumps(raw_object, protocol=4))).decode(
"utf-8"
)
def decompress_object(compressed_object, suite):
return pickle.loads(suite.decrypt(base64.b64decode(compressed_object)))
def protein_to_aln_index(protein_index, aln_seq):
n = 0
for i in range(len(aln_seq)):
if protein_index == n:
return i
elif aln_seq[i] == "-":
pass
else:
n += 1
def aln_index_to_protein(alignment_index, alignment):
res = dict()
for k, v in alignment.items():
if v[alignment_index] == "-":
res[k] = None
else:
res[k] = alignment_index - v[:alignment_index].count("-")
return res
def to_fasta_str(alignment):
res = []
for k, v in alignment.items():
res.append(f">{k}")
res.append(v)
return "\n".join(res)
@dataclass
......@@ -51,8 +189,10 @@ class PdbEntry:
)
@classmethod
def from_user_input(cls, pdb_path, chain_id="A"):
return cls(pdb_path, chain_id, -1, -1, "none", "none", "none", 0.0, pdb_path)
def from_user_input(cls, pdb_path, chain_id="none"):
return cls(
Path(pdb_path).stem, chain_id, -1, -1, "none", "none", "none", 0.0, pdb_path
)
def get_pdb(self, from_atm_file=None):
if from_atm_file is not None:
......@@ -98,14 +238,6 @@ class PdbEntry:
return f"{self.PDB_ID}_{self.CHAIN_ID}_{self.PdbResNumStart}"
def get_pdbs_from_folder(path):
file_names = Path(path).glob("*.pdb")
res = []
for f in file_names:
res.append(PdbEntry.from_user_input(str(f)))
return res
class PfamToPDB:
def __init__(
self,
......@@ -125,6 +257,7 @@ class PfamToPDB:
data_lines = data_lines[1:]
self.pfam_to_pdb_ids = dict()
self._initiate_pfam_to_pdbids(data_lines, limit=limit)
self.pdb_entries = None
self.msa = None
self.caretta_alignment = None
......@@ -152,66 +285,10 @@ class PfamToPDB:
n += 1
self.pfam_to_pdb_ids = new
def multiple_structure_alignment_from_pfam(
self, pdb_entries, gap_open_penalty=0.1, gap_extend_penalty=0.001
):
self.msa = PfamStructures.from_pdb_files([p.get_pdb()[1] for p in pdb_entries])
self.caretta_alignment = self.msa.align(
gap_open_penalty=gap_open_penalty, gap_extend_penalty=gap_extend_penalty
)
return (
self.caretta_alignment,
{s.name: pd.parsePDB(s.pdb_file) for s in self.msa.structures},
{s.name: s.features for s in self.msa.structures},
)
def get_entries_for_pfam(
self, pfam_id, limit_by_score=1.0, limit_by_protein_number=50, gross_limit=1000
self, pfam_id, limit_by_score=1.0, limit_by_protein_number=50
):
pdb_entries = list(
filter(lambda x: (x.eValue < limit_by_score), self.pfam_to_pdb_ids[pfam_id])
)[:limit_by_protein_number]
return pdb_entries
def alignment_from_folder(self):
pass
def to_fasta_str(self, alignment):
res = []
for k, v in alignment.items():
res.append(f">{k}")
res.append(v)
return "\n".join(res)
class PfamStructures(msa_numba.StructureMultiple):
def __init__(
self,
pdb_entries,
dssp_dir="caretta_tmp",
num_threads=20,
extract_all_features=True,
consensus_weight=1.0,
write_fasta=True,
output_fasta_filename=Path("./result.fasta"),
write_pdb=True,
output_pdb_folder=Path("./result_pdb/"),
write_features=True,
output_feature_filename=Path("./result_features.pkl"),
write_class=True,
output_class_filename=Path("./result_class.pkl"),
overwrite_dssp=False,
):
self.pdb_entries = pdb_entries
super(PfamStructures, self).from_pdb_files(
[p.get_pdb()[1] for p in self.pdb_entries],
dssp_dir,
num_threads,
extract_all_features,
consensus_weight,
output_fasta_filename,
output_pdb_folder,
output_feature_filename,
output_class_filename,
overwrite_dssp,
)
import dash_core_components as dcc
import dash_html_components as html
import numpy as np
from caretta.app import app_helper
from pathlib import Path
box_style = {
"box-shadow": "1px 3px 20px -4px rgba(0,0,0,0.75)",
"border-radius": "5px",
"background-color": "#f9f7f7",
}
def get_layout(
introduction_text,
input_text,
placeholder_text,
selection_text,
suite,
pfam_class=None,
):
return html.Div(
children=[
get_introduction_panel(introduction_text),
html.Br(),
get_input_panel_layout(
input_text, placeholder_text, selection_text, pfam_class
),
html.Br(),
get_hidden_variables_layout(suite),
get_sequence_alignment_layout(),
html.Br(),
get_structure_alignment_layout(),
html.Br(),
get_feature_alignment_panel(),
html.Br(),
]
)
def get_introduction_panel(introduction_text: str):
return html.Div(
html.Div(
[
html.H1("Caretta", style={"text-align": "center"}),
html.H3(
"a multiple protein structure alignment and feature extraction suite",
style={"text-align": "center"},
),
html.P(dcc.Markdown(introduction_text), style={"text-align": "left"}),
],
className="row",
),
className="container",
)
def get_hidden_variables_layout(suite):
return html.Div(
[
html.P(children="", id="proteins-list", style={"display": "none"}),
html.P(children="", id="feature-alignment-data", style={"display": "none"}),
html.P(
children=app_helper.compress_object(0, suite),
id="structure-alignment-selected-residue",
style={"display": "none"},
),
html.P(
children=app_helper.compress_object(0, suite),
id="feature-alignment-selected-residue",
style={"display": "none"},
),
html.P(
children="", id="sequence-alignment-data", style={"display": "none"}
),
html.P(children="", id="caretta-class", style={"display": "none"}),
html.P(
children=app_helper.compress_object(
np.random.randint(0, 1000000000), suite
),
id="unique-id",
style={"display": "none"},
),
]
)
def get_input_panel_layout(
input_text: str,
placeholder_text: str,
selection_text: str,
pfam_class: app_helper.PfamToPDB = None,
):
if pfam_class is not None:
user_input = dcc.Dropdown(
placeholder="Choose Pfam ID",
options=[{"label": x, "value": x} for x in pfam_class.pfam_to_pdb_ids],
id="user-input",
)
else:
user_input = (
dcc.Textarea(
placeholder=placeholder_text, value="", id="user-input", required=True,
),
)
return html.Div(
[
html.Div(
[
html.Br(),
html.H3(
"Choose Structures",
className="row",
style={"text-align": "center"},
),
html.P(input_text, className="row",),
html.Div(
[
html.Div(user_input, className="four columns",),
html.P(
dcc.Markdown(selection_text), className="four columns"
),
html.Button(
"Load Structures",
className="four columns",
id="load-structures-button",
),
],
className="row",
),
html.Div(
[
html.Div(
dcc.Dropdown(
placeholder="Gap open penalty (1.0)",
options=[
{"label": np.round(x, decimals=2), "value": x,}
for x in np.arange(0, 5, 0.1)
],
id="gap-open-dropdown",
),
className="four columns",
),
html.Div(
dcc.Dropdown(
multi=True,
id="proteins-selection-dropdown",
placeholder="Select PDB IDs to align",
),
className="four columns",
),
html.Div(
dcc.Dropdown(
placeholder="Gap extend penalty (0.01)",
options=[
{"label": np.round(x, decimals=3), "value": x,}
for x in np.arange(0, 1, 0.002)
],
id="gap-extend-dropdown",
),
className="four columns",
),
],
className="row",
),
html.Br(),
html.Div(
html.Button(
"Align Structures",
className="twelve columns",
id="align-button",
),
className="row",
),
dcc.Loading(
id="loading-indicator",
children=[
html.Div(
id="loading-indicator-output",
style={"text-align": "center"},
)
],
type="default",
),
html.P(