Source code for antipasti.preprocessing.preprocessing

# -*- coding: utf-8 -*-

r"""This module contains the pre-processing class. 

:Authors:   Kevin Michalewicz <k.michalewicz22@imperial.ac.uk>

"""

import glob
import numpy as np
import pandas as pd
import subprocess
import os

from config import DATA_DIR, SCRIPTS_DIR, STRUCTURES_DIR
from utils.generic_utils import remove_abc

[docs] class Preprocessing(object): r"""Generating the residue normal mode correlation maps. Parameters ---------- data_path: str Path to the data folder. scripts_path: str Path to the scripts folder. structures_path: str Path to the PDB files. df: str Name of the database containing the PDB entries. modes: int Number of considered normal modes. chain_lengths_path: str Path to the folder containing arrays with the chain lengths. dccm_map_path: str Path to the normal mode correlation maps. residues_path: str Path to the folder containing the list of residues per entry. file_type_input: str Filename extension of input structures. selection: str Considered portion of antibody chains. pathological: list PDB identifiers of antibodies that need to be excluded. renew_maps: bool Compute all the normal mode correlation maps. renew_residues: bool Retrieve the lists of residues for each entry. cmaps: bool If ``True``, ANTIPASTI computes the contact maps of the complexes instead of the Normal Modes. cmaps_thr: float Thresholding distance for alpha (α) carbons to build the contact maps. ag_agnostic: bool If ``True``, Normal Mode correlation maps are computed in complete absence of the antigen. affinity_entries_only: bool This is ``False`` in general, but the ANTIPASTI pipeline could be used to other types of projects and thus consider data without affinity values. stage: str Choose between ``training`` and ``predicting``. test_data_path: str Path to the test data folder. test_dccm_map_path: str Path to the test normal mode correlation maps. test_residues_path: str Path to the folder containing the list of residues for a test sample. test_structures_path: str Path to the test PDB file. test_pdb_id: str Test PDB ID. alphafold: bool ``True`` the test structure was folded using ``AlphaFold``. h_offset: int Amount of absent residues between positions 1 and 25 in the heavy chain. l_offset: int Amount of absent residues between positions 1 and 23 in the light chain. """ def __init__( self, data_path=DATA_DIR, scripts_path=SCRIPTS_DIR, structures_path=STRUCTURES_DIR, df='sabdab_summary_all.tsv', modes=30, chain_lengths_path='chain_lengths/', dccm_map_path='dccm_maps/', residues_path='lists_of_residues/', file_type_input='.pdb', selection='_fv', pathological=None, renew_maps=False, renew_residues=False, cmaps=False, cmaps_thr=8.0, ag_agnostic=False, affinity_entries_only=True, stage='training', test_data_path=None, test_dccm_map_path=None, test_residues_path=None, test_structure_path=None, test_pdb_id='1t66', alphafold=False, h_offset=0, l_offset=0, ag_residues=0, ): self.data_path = data_path self.scripts_path = scripts_path self.structures_path = structures_path self.chain_lengths_path = data_path + chain_lengths_path self.dccm_map_path = data_path + dccm_map_path self.residues_path = data_path + residues_path self.modes = modes self.file_type_input = file_type_input self.selection = selection self.pathological = pathological self.cmaps = cmaps self.cmaps_thr = cmaps_thr self.ag_agnostic = ag_agnostic self.affinity_entries_only = affinity_entries_only self.stage = 'training' self.file_residues_paths = sorted(glob.glob(os.path.join(self.residues_path, '*.npy'))) self.alphafold = alphafold self.h_offset = h_offset self.l_offset = l_offset self.ag_residues = ag_residues self.df_path = data_path + df self.entries, self.affinity, self.df = self.clean_df() self.heavy, self.light, self.selected_entries = self.initialisation(renew_maps, renew_residues) self.max_res_list_h, self.max_res_list_l, self.min_res_list_h, self.min_res_list_l = self.get_max_min_chains() self.train_x, self.train_y, self.labels, self.raw_imgs = self.load_training_images() self.stage = stage if self.stage != 'training': self.test_data_path = test_data_path self.test_dccm_map_path = self.test_data_path + test_dccm_map_path self.test_residues_path = self.test_data_path + test_residues_path self.test_structure_path = self.test_data_path + test_structure_path self.test_pdb_id = test_pdb_id self.test_x = self.load_test_image()
[docs] def clean_df(self): r"""Cleans the database containing the PDB entries. Returns ------- df_pdbs: list PDB entries. df_kds: list Binding affinities. df: pandas.DataFrame Cleaned database. """ df = pd.read_csv(self.df_path, sep='\t', header=0)[['pdb', 'antigen_type', 'affinity']] df.drop_duplicates(keep='first', subset='pdb', inplace=True) df['pdb'] = df['pdb'].str.lower().str.replace('+', '') # lowercase and remove '+' signs of scientific notation df = df[(df.antigen_type.notna()) & (df.antigen_type != 'NA')][['pdb', 'affinity']] if self.affinity_entries_only: df = df[(df.affinity.notna()) & (df.affinity != 'None')] df = df[~df['pdb'].isin(self.pathological)] # Removing pathological cases return list(df['pdb']), list(df['affinity']), df
[docs] def generate_fv_pdb(self, path, keepABC=True, lresidues=False, hupsymchain=None, lupsymchain=None): r"""Generates a new PDB file containing the antigen residues and the antibody variable region. Parameters ---------- path: str Path of a Chothia-numbered PDB file. keepABC: bool Keeps residues whose name ends with a letter from 'A' to 'Z'. lresidues: bool The names of each residue are stored in ``self.residues_path``. upsymchain: int Upper limit of heavy chain residues due to a change in the numbering convention. Only useful when using ``AlphaFold``. lupsymchain: int Upper limit of light chain residues due to a change in the numbering convention. Only useful when using ``AlphaFold``. """ amino_acid_dictionary = {'CYS': 'C', 'ASP': 'D', 'SER': 'S', 'GLN': 'Q', 'LYS': 'K', 'ILE': 'I', 'PRO': 'P', 'THR': 'T', 'PHE': 'F', 'ASN': 'N', 'GLY': 'G', 'HIS': 'H', 'LEU': 'L', 'ARG': 'R', 'TRP': 'W', 'ALA': 'A', 'VAL':'V', 'GLU': 'E', 'TYR': 'Y', 'MET': 'M'} if self.stage == 'training': rpath = self.residues_path else: rpath = self.test_residues_path list_residues = ['START-Ab'] with open(path, 'r') as f: # needs to be Chothia-numbered content = f.readlines() header_lines_important = range(4) header_lines = [content[i][0]=='R' for i in range(len(content))].count(True) h_range = range(1, 114) l_range = range(1, 108) residue_type_range = slice(17, 20) start_chain = 21 chain_range = slice(start_chain, start_chain+1) res_range = slice(23, 26) res_extra_letter = 26 #sometimes includes a letter 'A', 'B', 'C', ... h_chain_key = 'HCHAIN' l_chain_key = 'LCHAIN' antigen_chain_key = 'AGCHAIN' idx_list = list(header_lines_important) idx_list_l = [] idx_list_antigen = [] antigen_chains = [] new_path = path[:-4] + self.selection + path[-4:] # Getting the names of the heavy and antigen chains line = content[header_lines_important[-1]] if line.find(h_chain_key) != -1: h_pos = line.find(h_chain_key) + len(h_chain_key) + 1 h_chain = line[h_pos:h_pos+1] antigen_pos = line.find(antigen_chain_key) + len(antigen_chain_key) + 1 antigen_chains.append(line[antigen_pos:antigen_pos+1]) for i in range(3): if line[antigen_pos+2*i+1] in [',', ';']: antigen_chains.append(line[antigen_pos+2*i+2]) # If two (or more) interacting antigen chains present else: # useful when using AlphaFold h_chain = 'A' l_chain = 'B' antigen_chains = ['C', 'D', 'E'] idx_list = [0] h_range = range(1-self.h_offset, hupsymchain-self.h_offset) l_range = range(1-self.l_offset, lupsymchain-self.l_offset) h_pos = start_chain l_pos = start_chain if line.find(l_chain_key) != -1: l_pos = line.find(l_chain_key) + len(l_chain_key) + 1 l_chain = line[l_pos:l_pos+1] elif self.alphafold is False: l_chain = None # Checking if H and L chains have the same name if l_chain is not None and h_chain.upper() == l_chain.upper(): pathologic = True h_chain = h_chain.upper() l_chain = h_chain.lower() elif antigen_chains is not None and self.affinity_entries_only is False and (h_chain.upper() in antigen_chains or (l_chain is not None and l_chain.upper() in antigen_chains)): pathologic = True h_chain = h_chain.lower() if l_chain is not None: l_chain = l_chain.lower() else: pathologic = False # Checks for matching identifiers if pathologic: if 'X' not in antigen_chains: new_hchain = 'X' else: new_hchain = 'W' if 'Y' not in antigen_chains: new_lchain = 'Y' else: new_lchain = 'Z' else: new_hchain = h_chain new_lchain = l_chain # Obtaining lines for the heavy chain variable region first for i, line in enumerate(content[header_lines:]): if line[chain_range].find(h_chain) != -1 and int(line[res_range]) in h_range: if (line[res_extra_letter] == ' ' or keepABC == True) and line.find('HETATM') == -1: idx_list.append(i+header_lines) if lresidues == True: full_res = line[res_range] + line[res_extra_letter] if pathologic: full_res = new_hchain + full_res else: full_res = line[chain_range] + full_res full_res = amino_acid_dictionary[line[residue_type_range]] + full_res if full_res != list_residues[-1]: list_residues.append(full_res) # This separation ensures that heavy chain residues are enlisted first if l_chain is not None: for i, line in enumerate(content[header_lines:]): if line[chain_range].find(l_chain) != -1 and int(line[res_range]) in l_range: if (line[res_extra_letter] == ' ' or keepABC == True) and line.find('HETATM') == -1: idx_list_l.append(i+header_lines) if lresidues == True: full_res = line[res_range] + line[res_extra_letter] if pathologic: full_res = new_lchain + full_res else: full_res = line[chain_range] + full_res full_res = amino_acid_dictionary[line[residue_type_range]] + full_res if full_res != list_residues[-1]: list_residues.append(full_res) if lresidues == True: list_residues.append('END-Ab') # Obtaining antigen(s) for i, line in enumerate(content[header_lines:]): if any(line[chain_range] in agc for agc in antigen_chains) and h_chain not in antigen_chains and l_chain not in antigen_chains: idx_list_antigen.append(i+header_lines) if lresidues == True: full_res = line[chain_range] + line[res_range] + line[res_extra_letter] if line[residue_type_range] in amino_acid_dictionary: full_res = amino_acid_dictionary[line[residue_type_range]] + full_res if full_res != list_residues[-1]: list_residues.append(full_res) # List with name of every residue is saved if selected saving_path = rpath + path[-8:-4] + '.npy' #if not os.path.exists(saving_path): np.save(saving_path, list_residues) # Creating new file with open(new_path, 'w') as f_new: f_new.writelines([content[l] for l in idx_list[:header_lines_important[-1]]]) if l_chain is not None and self.alphafold is False: f_new.writelines([content[l][:h_pos]+new_hchain+content[l][h_pos+1:l_pos]+new_lchain+content[l][l_pos+1:] for l in idx_list[header_lines_important[-1]:header_lines_important[-1]+1]]) else: f_new.writelines([content[l][:h_pos]+new_hchain+content[l][h_pos+1:] for l in idx_list[header_lines_important[-1]:header_lines_important[-1]+1]]) f_new.writelines([content[l][:start_chain-5]+' '+content[l][start_chain-4:start_chain]+new_hchain+content[l][start_chain+1:] for l in idx_list[header_lines_important[-1]+1:]]) if l_chain is not None: f_new.writelines([content[l][:start_chain-5]+' '+content[l][start_chain-4:start_chain]+new_lchain+content[l][start_chain+1:] for l in idx_list_l]) if not self.ag_agnostic: f_new.writelines([content[l] for l in idx_list_antigen]) if not self.cmaps: f_new.writelines([content[l] for l in range(len(content)) if content[l][0:6] == 'HETATM' and content[l][chain_range] in [h_chain, l_chain] and l not in idx_list+idx_list_l+idx_list_antigen])
[docs] def generate_maps(self): r"""Generates the Normal Mode correlation maps. """ for i, entry in enumerate(self.entries): file_name = entry + self.selection path = self.structures_path + file_name + self.file_type_input new_path = self.dccm_map_path + entry self.generate_fv_pdb(self.structures_path+entry+self.file_type_input, lresidues=True) if not self.cmaps: # and len(np.load(self.residues_path + path[-11:-7] + '.npy')) > 500: subprocess.call(['/usr/local/bin/RScript', str(self.scripts_path)+'pdb_to_dccm.r', str(path), str(new_path), str(self.modes)], shell=True, stdout=open(os.devnull, 'wb')) #elif not self.cmaps: # print(path[-11:-7]) # subprocess.call(['/usr/local/bin/RScript', str(self.scripts_path)+'pdb_to_dccm_aa.r', str(path), str(new_path), str(self.modes)], stdout=open(os.devnull, 'wb')) else: subprocess.call(['python', str(self.scripts_path)+'generate_contact_maps.py', str(path), str(new_path), str(self.cmaps_thr)], stdout=open(os.devnull, 'wb')) if os.path.exists(path): os.remove(path) if i % 25 == 0: print('Map ' + str(i+1) + ' out of ' + str(len(self.entries)) + ' processed.')
[docs] def get_lists_of_lengths(self, selected_entries): r"""Retrieves lists with the lengths of the heavy and light chains. Parameters ---------- selected_entries: list PDB valid entries. Returns ------- heavy: list Lengths of the heavy chains. In the context of the prediction stage, this list has one element. light: list Lengths of the light chains. In the context of the prediction stage, this list has one element. selected_entries: list PDB valid entries. In the context of the prediction stage, this list has one element. """ heavy = [] light = [] if self.stage == 'training': rpath = self.residues_path else: rpath = self.test_residues_path for entry in selected_entries: list_of_residues = list(np.load(rpath+entry+'.npy'))[1:] chain_pos = 0 if 'END-Ab' in list_of_residues: list_of_residues = list_of_residues[:list_of_residues.index('END-Ab')] chain_pos = 1 else: list_of_residues = list_of_residues[:-1] h_chain = list_of_residues[0][chain_pos] l_chain = list_of_residues[-1][chain_pos] heavy.append(len([idx for idx in list_of_residues if idx[chain_pos] == h_chain])) if h_chain != l_chain: light.append(len([idx for idx in list_of_residues if idx[chain_pos] == l_chain])) else: light.append(0) return heavy, light, selected_entries
[docs] def get_max_min_chains(self): r"""Returns the longest and shortest possible chains. """ max_res_list_h = [] max_res_list_l = [] for f in self.file_residues_paths: shift = 0 if 'END-Ab' in np.load(f): shift = 1 idx = self.selected_entries.index(f[-8:-4]) current_list_h = np.load(f)[1:self.heavy[idx]+1] current_list_l = np.load(f)[self.heavy[idx]+1:self.heavy[idx]+self.light[idx]+1] current_list_h = [x[1+shift:] for x in current_list_h] current_list_l = [x[1+shift:] for x in current_list_l] max_res_list_h += list(set(current_list_h).difference(max_res_list_h)) max_res_list_l += list(set(current_list_l).difference(max_res_list_l)) max_res_list_h = sorted(max_res_list_h, key=remove_abc) min_res_list_h = list(dict.fromkeys([x for x in max_res_list_h])) max_res_list_h = [x.strip() for x in max_res_list_h] max_res_list_l = sorted(max_res_list_l, key=remove_abc) min_res_list_l = list(dict.fromkeys([x for x in max_res_list_l])) max_res_list_l = [x.strip() for x in max_res_list_l] for f in self.file_residues_paths: shift = 0 if 'END-Ab' in f: shift = 1 idx = self.selected_entries.index(f[-8:-4]) current_list_h = np.load(f)[1:self.heavy[idx]+1] current_list_l = np.load(f)[self.heavy[idx]+1:self.heavy[idx]+self.light[idx]+1] current_list_h = [x[1+shift:] for x in current_list_h] current_list_l = [x[1+shift:] for x in current_list_l] min_res_list_h = sorted(list(set(current_list_h).intersection(min_res_list_h))) min_res_list_l = sorted(list(set(current_list_l).intersection(min_res_list_l))) min_res_list_h = [x.strip() for x in min_res_list_h] min_res_list_l = [x.strip() for x in min_res_list_l] return max_res_list_h, max_res_list_l, min_res_list_h, min_res_list_l
[docs] def initialisation(self, renew_maps, renew_residues): r"""Computes the normal mode correlation maps and retrieves lists with the lengths of the heavy and light chains. Parameters ---------- renew_maps: bool Compute all the normal mode correlation maps. renew_residues: bool Retrieve the lists of residues for each entry. Returns ------- heavy: list Lengths of the heavy chains. light: list Lengths of the light chains. selected_entries: list PDB valid entries. """ if renew_maps: self.generate_maps() dccm_paths = sorted(glob.glob(os.path.join(self.dccm_map_path, '*.npy'))) selected_entries = [dccm_paths[i][-8:-4] for i in range(len(dccm_paths))] if renew_residues: heavy, light, selected_entries = self.get_lists_of_lengths(selected_entries) np.save(self.chain_lengths_path+'heavy_lengths.npy', heavy) np.save(self.chain_lengths_path+'light_lengths.npy', light) np.save(self.chain_lengths_path+'selected_entries.npy', selected_entries) else: heavy = np.load(self.chain_lengths_path+'heavy_lengths.npy').astype(int) light = np.load(self.chain_lengths_path+'light_lengths.npy').astype(int) assert list(np.load(self.chain_lengths_path+'selected_entries.npy')) == selected_entries for entry in selected_entries: residues = list(np.load(self.residues_path+entry+'.npy')) if 'END-Ab' in residues: assert len(residues[:residues.index('END-Ab')])-1 == heavy[selected_entries.index(entry)] + light[selected_entries.index(entry)] return heavy, light, selected_entries
[docs] def generate_masked_image(self, img, idx, test_h=None, test_l=None): r"""Generates a masked normal mode correlation map Parameters ---------- img: numpy.ndarray Original array containing no blank pixels. idx: int Input index. test_h: int Length of the heavy chain of an antibody in the test set. test_l: int Length of the light chain of an antibody in the test set. Returns ------- masked: numpy.ndarray Masked normal mode correlation map. mask: numpy.ndarray Mask itself. """ if self.stage == 'training': f = self.file_residues_paths[idx] elif self.alphafold is False: f = sorted(glob.glob(os.path.join(self.test_residues_path, '*'+self.test_pdb_id+'.npy')))[0] else: f = sorted(glob.glob(os.path.join(self.test_residues_path, '*'+self.test_pdb_id[:-3]+'.npy')))[0] # removing '_af' suffix antigen_max_pixels = self.ag_residues f_res = np.load(f) # We force unique elements self.max_res_list_h = list(dict.fromkeys(self.max_res_list_h)) self.max_res_list_l = list(dict.fromkeys(self.max_res_list_l)) max_res_h = len(self.max_res_list_h) max_res_l = len(self.max_res_list_l) max_res = max_res_h + max_res_l masked = np.zeros((max_res+antigen_max_pixels, max_res+antigen_max_pixels)) mask = np.zeros((max_res+antigen_max_pixels, max_res+antigen_max_pixels)) if self.stage != 'training': h = test_h l = test_l else: current_idx = self.selected_entries.index(f[-8:-4]) h = self.heavy[current_idx] l = self.light[current_idx] current_list_h = f_res[1:h+1] shift = len(f_res[1][:f_res[1].find(' ')]) current_list_h = [x[shift:].strip() for x in current_list_h] # First letter is the type of amino acid and second letter is the chain ID current_list_l = f_res[h+1:h+l+1] current_list_l = [x[shift:].strip() for x in current_list_l] idx_list = [i for i in range(max_res_h) if self.max_res_list_h[i] in current_list_h] idx_list += [i+max_res_h for i in range(max_res_l) if self.max_res_list_l[i] in current_list_l] idx_list += [i+max_res_h+max_res_l for i in range(min(antigen_max_pixels, img.shape[-1]-(h+l)))] for k, i in enumerate(idx_list): for l, j in enumerate(idx_list): masked[i, j] = img[k, l] mask[i, j] = 1 return masked, mask
[docs] def load_training_images(self): r"""Returns the input/output pairs of the model and their corresponding labels. """ imgs = [] raw_imgs = [] kds = [] labels = [] file_paths = sorted(glob.glob(os.path.join(self.dccm_map_path, '*.npy'))) for f in file_paths: pdb_id = f[-8:-4] if pdb_id in self.selected_entries and pdb_id not in self.pathological: raw_sample = np.load(f) idx = self.entries.index(pdb_id) idx_new = self.selected_entries.index(pdb_id) labels.append(pdb_id) raw_imgs.append(raw_sample) imgs.append(self.generate_masked_image(raw_sample, idx_new)[0]) if self.affinity_entries_only: kds.append(np.log10(np.float32(self.affinity[idx]))) assert labels == [item for item in self.selected_entries if item not in self.pathological] #for pdb in self.selected_entries: # if pdb not in self.pathological and self.affinity_entries_only: # assert np.float16(10**kds[[item for item in self.selected_entries if item not in self.pathological].index(pdb)] == np.float16(self.df[self.df['pdb']==pdb]['affinity'])).all() return np.array(imgs), np.array(kds), labels, raw_imgs
[docs] def load_test_image(self): r"""Returns a test normal mode correlation map which is masked according to the existing residues in the training set. """ pdb_id = self.test_pdb_id if self.alphafold is True: h, l, _ = self.get_lists_of_lengths(selected_entries=str(pdb_id[:-3]).split()) h = h[0] l = l[0] hupsymchain = 1 + h lupsymchain = 1 + l lresidues = False else: hupsymchain = None lupsymchain = None lresidues = True # Generating the normal mode correlation map file_name = pdb_id + self.selection path = self.test_structure_path + file_name + self.file_type_input new_path = self.test_dccm_map_path + pdb_id self.generate_fv_pdb(self.test_structure_path+pdb_id+self.file_type_input, lresidues=lresidues, hupsymchain=hupsymchain, lupsymchain=lupsymchain) subprocess.call(['/usr/local/bin/RScript '+str(self.scripts_path)+'pdb_to_dccm.r '+str(path)+' '+str(new_path)+' '+str(self.modes)], shell=True, stdout=open(os.devnull, 'wb')) if os.path.exists(path): os.remove(path) # Getting lengths, residues and masking file_path = sorted(glob.glob(os.path.join(self.test_dccm_map_path, '*'+pdb_id+'.npy'))) for f in file_path: raw_sample = np.load(f) if self.alphafold is False: h, l, _ = self.get_lists_of_lengths(selected_entries=str(pdb_id).split()) h = h[0] l = l[0] return self.generate_masked_image(raw_sample, 0, test_h=int(h), test_l=int(l))[0]