Il y a quelques semaines, nous avons commencĂ© Ă parler des projets qui sont devenus laurĂ©ats de l' Ăcole de programmation pratique et d'analyse de donnĂ©es de l' Ăcole supĂ©rieure d'Ă©conomie de Saint-PĂ©tersbourg et de JetBrains.
La deuxiĂšme place a Ă©tĂ© prise par une Ă©quipe d'Ă©lĂšves de onziĂšme annĂ©e du MSU SSCC. Les gars ont mis en place un modĂšle qui prĂ©dit la solubilitĂ© des substances sur la base de la reprĂ©sentation SMILES des molĂ©cules. Qu'est-ce que c'est, quelles mĂ©thodes d'apprentissage automatique peuvent ĂȘtre utilisĂ©es dans cette tĂąche et si les rĂ©sultats obtenus concordent avec de vraies expĂ©riences chimiques, ont dĂ©clarĂ© les auteurs du projet dans cet article.
Ăquipe
Notre équipe était composée de quatre membres : Andrey Shandybin, Artyom Vlasov, Vladimir Sverdlov et Zakhar Kravchuk. Nous sommes tous diplÎmés de SSC MSU cette année et avons déjà eu une expérience en apprentissage automatique dans un cours spécial de ML à SSC et sur des projets parallÚles. Notre curatrice était Alisa Alenicheva , chercheuse au laboratoire de recherche Machine Learning Applications et Deep Learning JetBrains.
Formulation du problĂšme
, . . , , SMILES ( ) . .
, , â . Root Mean Square Error (RMSE), :
:
baseline ;
(GCN);
;
.
ESOL , 1128 . SMILES. , , - .
, SMILES. SMILES (Simplified Molecular Input Line Entry System) â ASCII. open-source RDKit SMILES .
class DatasetsHolder:
@staticmethod
def read_datasets(inp_folder_path):
df = pd.read_csv(inp_folder_path)
return df
# return pandas DataFrame
class StandardizeDatasets:
@staticmethod
def standardize_smiles(smi: str) -> Optional[str]:
mol = Chem.MolFromSmiles(smi)
mol = Chem.MolToSmiles(mol)
return mol
"crete typical standardization of one smiles"
@logger.catch()
def standardize(self, inp_path: Path, out_path: Path):
df_reader = DatasetsHolder()
df = df_reader.read_datasets(inp_path)
with Pool(10) as pool:
df['standardize_smiles'] = list(
tqdm(pool.imap(self.standardize_smiles, df.smiles), total=df.shape[0])
)
df.to_csv(out_path, index=False)
return df
"apply standardization to all smiles"
class StandardizeTautomers(StandardizeDatasets):
@staticmethod
def standardize_smiles(smi: str) -> Optional[str]:
Canonicalizer = TautomerCanonicalizer()
mol = Chem.MolFromSmiles(smi)
standorized = Canonicalizer.canonicalize(mol)
return Chem.MolToSmiles(standorized)
# "apply TautomerCanonicalizer() to standardization"
Baseline
baseline- (XGBoost). , , SMILES.
from descriptastorus.descriptors import rdDescriptors
from rdkit import Chem
import logging
from descriptastorus.descriptors import rdNormalizedDescriptors
generator = rdNormalizedDescriptors.RDKit2DNormalized()
def rdkit_2d_features(smiles: str):
features = generator.process(smiles)
if features[0] == False:
print(f'{smiles} were not processed correctly')
return None
else:
return features[1:]
def create_feature_dataframe(df):
feature_names = [x[0] for x in generator.columns]
rdkit_feats = [ ]
for i in range(len(df)):
smiles = df.iloc[i][SMILES_COLUMN]
target_value = df.iloc[i]['measured log solubility in mols per litre']
features = generator.process(smiles)
dictionary = dict(zip(feature_names, features[1:]))
dictionary['target'] = target_value
rdkit_feats.append(dictionary)
return pd.DataFrame(rdkit_feats)
, , , , ( ).
from xgboost import XGBRegressor
X_train = train_data.drop(columns=['target'])
y_train = train_data['target']
X_test = test_data.drop(columns=['target'])
y_test = test_data['target']
model = XGBRegressor()
model.fit(X_train, y_train)
, . SMILES . , , â . : , , , , , , .
RDkit
def get_atom_features(mol):
atomic_number = []
num_hs = []
degrees = []
charges = []
tags = []
hybridizations = []
aromatic = []
mass = []
for atom in mol.GetAtoms():
atomic_number.append(atom.GetAtomicNum()) # atomic number
num_hs.append(atom.GetTotalNumHs(includeNeighbors=True)) # number of H in atom
degrees.append(atom.GetTotalDegree()) # total Degree of atom
charges.append(atom.GetFormalCharge()) # Charge of atom
tags.append(int(atom.GetChiralTag())) # chiral tag
hybridizations.append(int(atom.GetHybridization())) # hybridization of atom
if atom.GetIsAromatic(): # Is aromatic of not
aromatic.append(1)
else:
aromatic.append(0)
mass.append(atom.GetMass() * 0.01) # mass
return torch.tensor([atomic_number, num_hs, degrees, charges, tags, hybridizations, aromatic, mass]).t()
def get_edge_index(mol):
row, col = [], []
for bond in mol.GetBonds():
start, end = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx()
row += [start, end]
col += [end, start]
return torch.tensor([row, col], dtype=torch.long)
Fingerprints
? â , , â , . .
Morgan Fingerprints VS Neural Fingerprints
â , -.
. -, , - . -, , , . Neural Fingerprint. , . , .
Message passing
. Message passing.
Message passing â , . .
, , . .
PyTorch Geometric.
Message Passing
class GCNConv(MessagePassing):
def __init__(self, in_channels, out_channels):
super(GCNConv, self).__init__(aggr='add') # "Add" aggregation (Step 5).
self.lin = torch.nn.Linear(in_channels, out_channels)
def forward(self, x, edge_index):
# x has shape [N, in_channels]
# edge_index has shape [2, E]
# Step 1: Add self-loops to the adjacency matrix.
edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))
# Step 2: Linearly transform node feature matrix.
x = self.lin(x)
# Step 3: Compute normalization.
row, col = edge_index
deg = degree(col, x.size(0), dtype=x.dtype)
deg_inv_sqrt = deg.pow(-0.5)
norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]
# Step 4-5: Start propagating messages.
return self.propagate(edge_index, x=x, norm=norm)
def message(self, x_j, norm):
# x_j has shape [E, out_channels]
# Step 4: Normalize node features.
return norm.view(-1, 1) * x_j
Neural loop
class NeuralFP(nn.Module):
def __init__(self, atom_features=52, fp_size=50):
super(NeuralFP, self).__init__()
self.atom_features = atom_features
self.fp_size = fp_size
self.loop1 = GCNConv(atom_features, fp_size)
self.loops = nn.ModuleList([self.loop1])
def forward(self, data):
fingerprint = torch.zeros((data.batch.shape[0], self.fp_size), dtype=torch.float).to(device)
out = data.x
for idx, loop in enumerate(self.loops):
updated_fingerprint = loop(out, data.edge_index)
fingerprint += updated_fingerprint
return scatter_add(fingerprint, data.batch, dim=0)
â GCN (Graph Convolutional Network)
, . .
, .
import torch.nn.functional as F
class MLP_Regressor(nn.Module):
def __init__(self, neural_fp, atom_features=2, fp_size=50, hidden_size=100):
super(MLP_Regressor, self).__init__()
self.neural_fp = neural_fp
self.lin1 = nn.Linear(fp_size , hidden_size)
self.leakyrelu = nn.LeakyReLU(0.2)
self.lin2 = nn.Linear(hidden_size, 1)
self.dropout = nn.Dropout(0.2)
def forward(self, batch):
fp = self.neural_fp(batch)
hidden = self.dropout(self.lin1(fp))
out = self.leakyrelu(self.lin2(hidden))
return out
, .
, GCN SMILES, . , . 200 , . .
MLP regressor
class MLP_Regressor(nn.Module):
def __init__(self, neural_fp, atom_features=2, fp_size=100, hidden_size=300, num_additional_features = 207):
super(MLP_Regressor, self).__init__()
self.neural_fp = neural_fp
self.lin1 = nn.Linear(fp_size+num_additional_features, hidden_size)
self.leakyrelu = nn.LeakyReLU(0.2)
self.lin2 = nn.Linear(hidden_size, 1)
self.dropout = nn.Dropout(0.2)
def forward(self, batch, additional_features):
fp = self.neural_fp(batch)
fp = torch.cat((fp, additional_features), dim=1)
hidden = self.dropout(self.lin1(fp))
out = self.leakyrelu(self.lin2(hidden))
return out
, , , .
10% , â 90%. . , â RMSE.
, , - . , , .
, . , : , , - , . , .
, , . Directed Message Passing (DMPNN), Message Passing. , DMPNN, . DMPNN.
. , ESOL, , â , . .
. , .
, , .
: