diff --git a/bimodal.py b/bimodal.py deleted file mode 100644 index b983f23f34990db2d16eaf4dae0204c73d8b404a..0000000000000000000000000000000000000000 --- a/bimodal.py +++ /dev/null @@ -1,656 +0,0 @@ -import pandas as pd -import numpy as np -from sklearn.preprocessing import OneHotEncoder -from numpy import array -from numpy import argmax -from sklearn.preprocessing import LabelEncoder -import torch -import torch.nn as nn -from rdkit import Chem -torch.manual_seed(1) -np.random.seed(5) - - -class SMILESEncoder(): - - def __init__(self): - # Allowed tokens (adapted from default dictionary) - self._tokens = np.sort(['#', '=', - '\\', '/', '%', '@', '+', '-', '.', - '(', ')', '[', ']', - '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', - 'A', 'B', 'E', 'C', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'O', 'P', 'S', 'T', 'V', - 'Z', - 'a', 'b', 'c', 'd', 'e', 'g', 'i', 'l', 'n', 'o', 'p', 'r', 's', 't' - ]) - - # Dictionary mapping index to token - self._encoder = OneHotEncoder(categories=[self._tokens], dtype=np.uint8, sparse=False) - - def encode_from_file(self, name='data'): - '''One-hot-encoding from .csv file - :param name: name of data file - :return: encoded data (data size, molecule size, allowed token size) - ''' - data = pd.read_csv('/content/SMILES_BIMODAL_FBRNN_fixed.tar.xz', compression='xz', header=None).values[1:-1] - print(data) - - # Store dimensions - shape = data.shape - data = data.reshape(-1) - print(shape) - # Remove empty dimensions - data = np.squeeze(data) - - # Return array with same first and second dimensions as input - return self.encode(data).reshape((shape[0], shape[1], -1, len(self._tokens))) - - def encode(self, data): - '''One-hot-encoding - :param data: input data (sample size,) - :return one_hot: encoded data (sample size, molecule size, allowed token size) - ''' - print("INITIAL DATA",data) - # Split SMILES into characters - data = self.smiles_to_char(data) - - # Store dimensions and reshape to use encoder - shape = data.shape - data = data.reshape((-1, 1)) - - # Encode SMILES - data = self._encoder.fit_transform(data) - print("encoded smiles",data) - # Restore shape - data = data.reshape((shape[0], shape[1], -1)) - - return data - - def decode(self, one_hot): - '''Decode one-hot encoding to SMILES - :param one_hot: one_hot data (sample size, molecule size, allowed token size) - :return data: SMILES (sample size,) - ''' - one_hot=np.array(one_hot) - # Store dimensions and reshape to use encoder - shape = one_hot.shape[0] - one_hot = one_hot.reshape((-1, len(self._tokens))) - - # Decode SMILES - data = self._encoder.inverse_transform(one_hot) - - # Restore shape - data = data.reshape((shape, -1)) - # Merge char to SMILES - smiles = self.char_to_smiles(data) - print(smiles) - return smiles - - def smiles_to_char(self, data): - '''Split SMILES into array of char - :param data: input data (sample size,) - :return char_data: encoded data (sample size, molecule size) - ''' - char_data = [] - for i, s in enumerate(data): - char_data.append(np.array(list(s))) - # Get array from list - char_data = np.stack(char_data, axis=0) - - return char_data - - def char_to_smiles(self, char_data): - '''Merge array of char into SMILES - :param char_data: input data (sample size, molecule size) - :return data: encoded data (sample size, ) - ''' - data = [] - for i in range(char_data.shape[0]): - data.append(''.join(char_data[i, :])) - - return np.array(data) - -class BiDirLSTM(nn.Module): - - def __init__(self, input_dim=110, hidden_dim=256, layers=2): - super(BiDirLSTM, self).__init__() - - # Dimensions - self._input_dim = input_dim - self._hidden_dim = hidden_dim - self._output_dim = input_dim - - # Number of LSTM layers - self._layers = layers - - # LSTM for forward and backward direction - self._blstm = nn.LSTM(input_size=self._input_dim, hidden_size=self._hidden_dim, num_layers=layers, - dropout=0.3, bidirectional=True) - # print("BLSTM:",self._blstm) - # All weights initialized with xavier uniform - nn.init.xavier_uniform_(self._blstm.weight_ih_l0) - nn.init.xavier_uniform_(self._blstm.weight_ih_l1) - nn.init.orthogonal_(self._blstm.weight_hh_l0) - nn.init.orthogonal_(self._blstm.weight_hh_l1) - - # Bias initialized with zeros expect the bias of the forget gate - self._blstm.bias_ih_l0.data.fill_(0.0) - self._blstm.bias_ih_l0.data[self._hidden_dim:2 * self._hidden_dim].fill_(1.0) - - self._blstm.bias_ih_l1.data.fill_(0.0) - self._blstm.bias_ih_l1.data[self._hidden_dim:2 * self._hidden_dim].fill_(1.0) - - self._blstm.bias_hh_l0.data.fill_(0.0) - self._blstm.bias_hh_l0.data[self._hidden_dim:2 * self._hidden_dim].fill_(1.0) - - self._blstm.bias_hh_l1.data.fill_(0.0) - self._blstm.bias_hh_l1.data[self._hidden_dim:2 * self._hidden_dim].fill_(1.0) - # print("With bias",self._blstm) - # Batch normalization (Weights initialized with one and bias with zero) - self._norm_0 = nn.LayerNorm(self._input_dim, eps=.001) - self._norm_1 = nn.LayerNorm(2 * self._hidden_dim, eps=.001) - - # Separate linear model for forward and backward computation - self._wpred = nn.Linear(2 * self._hidden_dim, self._output_dim) - nn.init.xavier_uniform_(self._wpred.weight) - self._wpred.bias.data.fill_(0.0) - - def _init_hidden(self, batch_size,device): - '''Initialize hidden states - :param batch_size: size of the new batch - device: device where new tensor is allocated - :return: new hidden state - ''' - - return (torch.zeros(2 * self._layers, batch_size, self._hidden_dim).to(device), - torch.zeros(2 * self._layers, batch_size, self._hidden_dim).to(device)) - - def new_sequence(self, batch_size=20, device="cpu"): - '''Prepare model for a new sequence - :param batch_size: size of the new batch - device: device where new tensor should be allocated - :return: - ''' - self._hidden = self._init_hidden(batch_size, device) - return - - def check_gradients(self): - '''Print gradients''' - print('Gradients Check') - for p in self.parameters(): - print('1:', p.grad.shape) - print('2:', p.grad.data.norm(2)) - print('3:', p.grad.data) - - def forward(self, input, next_prediction='right', device="cpu"): - '''Forward computation - :param input: tensor (sequence length, batch size, encoding size) - :param next_prediction: new token is predicted for the left or right side of existing sequence - :param device: device where computation is executed - :return pred: prediction (batch site, encoding size) - ''' - - # If next prediction is appended at the left side, the sequence is inverted such that - # forward and backward LSTM always read the sequence along the forward and backward direction, respectively. - if next_prediction == 'left': - # Reverse copy of numpy array of given tensor - input = np.flip(input.cpu().numpy(), 0).copy() - input = torch.from_numpy(input).to(device) - - # Normalization over encoding dimension - norm_0 = self._norm_0(input) - #print("NORM",norm_0,"HIDDEN",self._hidden) - # Compute LSTM unit - out, self._hidden = self._blstm(norm_0, self._hidden) - print("OUTPUT",out) - print("hidden",self._hidden) - # out (sequence length, batch_size, 2 * hidden dim) - # Get last prediction from forward (0:hidden_dim) and backward direction (hidden_dim:2*hidden_dim) - for_out = out[-1, :, 0:self._hidden_dim] - back_out = out[0, :, self._hidden_dim:] - - # Combine predictions from forward and backward direction - bmerge = torch.cat((for_out, back_out), -1) - - # Normalization over hidden dimension - norm_1 = self._norm_1(bmerge) - - # Linear unit forward and backward prediction - pred = self._wpred(norm_1) - - return pred -""" -Implementation of BIMODAL to generate SMILES -""" - - -class BIMODAL(): - - def __init__(self, molecule_size=20, encoding_dim=55, lr=.01, hidden_units=128): - - self._molecule_size = molecule_size - self._input_dim = encoding_dim - self._output_dim = encoding_dim - self._layer = 2 - self._hidden_units = hidden_units - - # Learning rate - self._lr = lr - - # Build new model - self._lstm = BiDirLSTM(self._input_dim, self._hidden_units, self._layer) - print(self._lstm) - # Check availability of GPUs - self._gpu = torch.cuda.is_available() - self._device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") - print("DEVICE:",self._gpu) - if torch.cuda.is_available(): - self._lstm = self._lstm.cuda() - print('GPU available') - - # Adam optimizer - self._optimizer = torch.optim.Adam(self._lstm.parameters(), lr=self._lr, betas=(0.9, 0.999)) - # Cross entropy loss - self._loss = nn.CrossEntropyLoss(reduction='mean') - - def build(self, name=None): - """Build new model or load model by name - :param name: model name - """ - - if (name is None): - self._lstm = BiDirLSTM(self._input_dim, self._hidden_units, self._layer) - - else: - self._lstm = torch.load('/content/model_fold_1_epochs_9.dat') - print("BUILD:",self._lstm) - if torch.cuda.is_available(): - self._lstm = self._lstm.cuda() - - self._optimizer = torch.optim.Adam(self._lstm.parameters(), lr=self._lr, betas=(0.9, 0.999)) - print("Build Optimizer:",self._optimizer) - def print_model(self): - '''Print name and shape of all tensors''' - for name, p in self._lstm.state_dict().items(): - print(name) - print(p.shape) - - def train(self, data, label, epochs=1, batch_size=1): - '''Train the model - :param data: data array (n_samples, molecule_size, encoding_length) - :param label: label array (n_samples, molecule_size) - :param epochs: number of epochs for the training - :param batch_size: batch size for the training - :return statistic: array storing computed losses (epochs, batch size) - ''' - - # Number of samples - n_samples = data.shape[0] - - # Change axes from (n_samples, molecule_size, encoding_dim) to (molecule_size, n_samples, encoding_dim) - data = np.swapaxes(data, 0, 1) - - # Create tensor from label - label = torch.from_numpy(label).to(self._device) - - # Calculate number of batches per epoch - if (n_samples % batch_size) == 0: - n_iter = n_samples // batch_size - else: - n_iter = n_samples // batch_size + 1 - - # To store losses - statistic = np.zeros((epochs, n_iter)) - - # Prepare model for training - self._lstm.train() - - # Iteration over epochs - for i in range(epochs): - - # Iteration over batches - for n in range(n_iter): - - # Set gradient to zero for batch - self._optimizer.zero_grad() - - # Store losses in list - losses = [] - - # Compute indices used as batch - batch_start = n * batch_size - batch_end = min((n + 1) * batch_size, n_samples) - - # Reset model with correct batch size - self._lstm.new_sequence(batch_end - batch_start, self._device) - - # Current batch - batch_data = torch.from_numpy(data[:, batch_start:batch_end, :].astype(np.float32)).to(self._device) - - # Initialize start and end position of sequence read by the model - start = self._molecule_size // 2 - end = start + 1 - - for j in range(self._molecule_size - 1): - self._lstm.new_sequence(batch_end - batch_start, self._device) - - # Select direction for next prediction - if j % 2 == 0: - dir = 'right' - else: - dir = 'left' - - # Predict next token - pred = self._lstm(batch_data[start:end], dir, self._device) - - # Compute loss and extend sequence read by the model - if j % 2 == 0: - loss = self._loss(pred, label[batch_start:batch_end, end]) - end += 1 - - else: - loss = self._loss(pred, label[batch_start:batch_end, start - 1]) - start -= 1 - - # Append loss of current position - losses.append(loss.item()) - - # Accumulate gradients - # (NOTE: This is more memory-efficient than summing the loss and computing the final gradient for the sum) - loss.backward() - - # Store statistics: loss per token (middle token not included) - statistic[i, n] = np.sum(losses) / (self._molecule_size - 1) - - # Perform optimization step - self._optimizer.step() - - return statistic - - def validate(self, data, label, batch_size=128): - ''' Validation of model and compute error - :param data: test data (n_samples, molecule_size, encoding_size) - :param label: label data (n_samples_molecules_size) - :param batch_size: batch size for validation - :return: mean loss over test data - ''' - - # Use train mode to get loss consistent with training - self._lstm.train() - - # Gradient is not compute to reduce memory requirements - with torch.no_grad(): - # Compute tensor of labels - label = torch.from_numpy(label).to(self._device) - - # Number of samples - n_samples = data.shape[0] - - # Change axes from (n_samples, molecule_size, encoding_dim) to (molecule_size , n_samples, encoding_dim) - data = np.swapaxes(data, 0, 1).astype(np.float32) - - # Initialize loss for complete validation set - tot_loss = 0 - - # Calculate number of batches per epoch - if (n_samples % batch_size) == 0: - n_iter = n_samples // batch_size - else: - n_iter = n_samples // batch_size + 1 - - for n in range(n_iter): - - # Compute indices used as batch - batch_start = n * batch_size - batch_end = min((n + 1) * batch_size, n_samples) - - # Data used in this batch - batch_data = torch.from_numpy(data[:, batch_start:batch_end, :].astype(np.float32)).to(self._device) - - # Initialize loss for molecule - molecule_loss = 0 - - # Reset model with correct batch size and device - self._lstm.new_sequence(batch_end - batch_start, self._device) - - start = self._molecule_size // 2 - end = start + 1 - - for j in range(self._molecule_size - 1): - self._lstm.new_sequence(batch_end - batch_start, self._device) - - # Select direction for next prediction - if j % 2 == 0: - dir = 'right' - if j % 2 == 1: - dir = 'left' - - # Predict next token - pred = self._lstm(batch_data[start:end], dir, self._device) - - # Extend reading of the sequence - if j % 2 == 0: - loss = self._loss(pred, label[batch_start:batch_end, end]) - end += 1 - - if j % 2 == 1: - loss = self._loss(pred, label[batch_start:batch_end, start - 1]) - start -= 1 - - # Sum loss over molecule - molecule_loss += loss.item() - - # Add loss per token to total loss (start token and end token not counted) - tot_loss += molecule_loss / (self._molecule_size - 1) - - return tot_loss / n_iter - - def sample(self, middle_token, T=1): - '''Generate new molecule - :param middle_token: starting sequence - :param T: sampling temperature - :return molecule: newly generated molecule (molecule_length, encoding_length) - ''' - - # Prepare model - self._lstm.eval() - print("TORCH",torch.__version__) - # Gradient is not compute to reduce memory requirements - with torch.no_grad(): - # Output array with merged forward and backward directions - - # New sequence - seq = np.zeros((self._molecule_size, 1, self._output_dim)) - f=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0 ,0 ,0 ,0 ,0 ,0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - seq[self._molecule_size // 2, 0] = f - print("UPDATED:",seq[self._molecule_size // 2, 0]) - # Create tensor for data and select correct device - seq = torch.from_numpy(seq.astype(np.float32)).to(self._device) - print("Tensor:",seq[self._molecule_size // 2, 0]) - # Define start/end values for reading - start = self._molecule_size // 2 - end = start + 1 - - for j in range(self._molecule_size - 1): - self._lstm.new_sequence(1, self._device) - - # Select direction for next prediction - if j % 2 == 0: - dir = 'right' - if j % 2 == 1: - dir = 'left' - print("j=",j) - # print("Start to end reading",seq[start:end]) - pred = self._lstm(seq[start:end], dir, self._device) - print("Predict",pred) - # Compute new token - token = self.sample_token(np.squeeze(pred.cpu().detach().numpy()), T) - - # Set new token within sequence - if j % 2 == 0: - seq[end, 0, token] = 1 - end += 1 - - if j % 2 == 1: - seq[start - 1, 0, token] = 1 - start -= 1 - print("SAMPLE OUTPUT:",seq.cpu().numpy().reshape(1, self._molecule_size, self._output_dim)) - return seq.cpu().numpy().reshape(1, self._molecule_size, self._output_dim) - - def sample_token(self, out, T=1.0): - ''' Sample token - :param out: output values from model - :param T: sampling temperature - :return: index of predicted token - ''' - # Explicit conversion to float64 avoiding truncation errors - out = out.astype(np.float64) - - # Compute probabilities with specific temperature - out_T = out / T - p = np.exp(out_T) / np.sum(np.exp(out_T)) - - # Generate new token at random - char = np.random.multinomial(1, p, size=1) - print("CHARACTER----->",char) - return np.argmax(char) - -def clean_molecule(m): - ''' Depending on the model different remains from generation should be removed - :param m: molecule with padding - :param model_type: Type of the model - :return: cleaned molecule - ''' - m = remove_right_left_padding(m, start='G', end='E') - - m = remove_token([m], 'G') - - return m[0] -def check_valid(mol): - '''Check if SMILES is valid - :param mol: SMILES string - :return: True / False - ''' - # Empty SMILE not accepted - if mol == '': - return False - - # Check valid with RDKit - # MolFromSmiles returns None if molecule not valid - mol = Chem.MolFromSmiles(mol, sanitize=True) - if mol is None: - return False - return True -def remove_right_left_padding(mol, start='G', end='E'): - '''Remove right and left padding from start to end token - :param mol: SMILES string - :param start: token where to start - :param end: token where to finish - :return: new SMILES where padding is removed - ''' - # Find start and end index - mid_ind = mol.find(start) - end_ind = mol.find(end, mid_ind) - start_ind = len(mol) - mol[::-1].find(end, mid_ind) - 1 - - return mol[start_ind + 1:end_ind] -def remove_token(mol, t='G'): - '''Remove specific token from SMILES - :param mol: SMILES string - :param t: token to be removed - :return: new SMILES string without token t - ''' - mol = np.array([d.replace(t, '') for d in mol]) - return mol -data1 = pd.read_csv('/content/SMILES_BIMODAL_FBRNN_fixed.tar.xz', compression='xz', header=None).values[1:-1, 0] -for i, mol_dat in enumerate(data1): - data1[i] = clean_molecule(mol_dat) -c=BIMODAL() -#encoder=SMILESEncoder() -data = pd.read_csv('/content/SMILES_BIMODAL_FBRNN_fixed.tar.xz', compression='xz', header=None).values[1:-1] -def smiles_to_char(data): - - char_data = [] - for i, s in enumerate(data): - char_data.append(np.array(list(s))) - # Get array from list - char_data = np.stack(char_data, axis=0) - - return char_data -def char_to_smiles(char_data): - data = [] - for i in range(char_data.shape[0]): - data.append(''.join(char_data[i, :])) - - return np.array(data) -tokens = np.sort(['#', '=', - '\\', '/', '%', '@', '+', '-', '.', - '(', ')', '[', ']', - '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', - 'A', 'B', 'E', 'C', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'O', 'P', 'S', 'T', 'V', - 'Z', - 'a', 'b', 'c', 'd', 'e', 'g', 'i', 'l', 'n', 'o', 'p', 'r', 's', 't' - ]) -encoder = OneHotEncoder(categories=[tokens], dtype=np.uint8, sparse=False) -shape = data.shape -data = data.reshape(-1) -print(shape) - # Remove empty dimensions -data = np.squeeze(data) -data = smiles_to_char(data) - - # Store dimensions and reshape to use encoder -shape = data.shape -data = data.reshape((-1, 1)) - - # Encode SMILES -data = encoder.fit_transform(data) -print("encoded smiles",data) -c.build() -c.print_model() -res_molecules = [] -T=0.7 -fold=[1] -epoch=[9] -valid=True -novel=True -unique=True -N=5 -print('Sampling: started') -for f in fold: - for e in epoch: - c.build() - new_molecules = [] - while len(new_molecules) < N: - one_hot=c.sample('G', T) - one_hot=np.array(one_hot) - shape = one_hot.shape[0] - one_hot = one_hot.reshape((-1, len(tokens))) - - # Decode SMILES - char_data = encoder.inverse_transform(one_hot) - print("DECODED :",char_data) - # Restore shape - char_data = char_data.reshape((shape, -1)) - # Merge char to SMILES - smiles = char_to_smiles(char_data) - new_mol = smiles -# Remove remains from generation - new_mol = clean_molecule(new_mol[0]) - print("Clean mol:",new_mol) -# If not valid, get new molecule - if valid and not check_valid(new_mol): - continue -# If not unique, get new molecule - if unique and (new_mol in new_molecules): - continue -# If not novel, get molecule - if novel and (new_mol in data1): - continue -# If all conditions checked, add new molecule - new_molecules.append(new_mol) - mol = np.array(new_molecules).reshape(-1) - print("MOLECULE:",mol) - print("Stored") - res_molecules.append(new_molecules) -print('Sampling: done ->',res_molecules)