Pocket-Gen / utils /docking.py
Zaixi's picture
1
dcacefd
import os
import subprocess
import random
import string
from easydict import EasyDict
from rdkit import Chem
from rdkit.Chem.rdForceFieldHelpers import UFFOptimizeMolecule
from .reconstruct import reconstruct_from_generated
def get_random_id(length=30):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
def load_pdb(path):
with open(path, 'r') as f:
return f.read()
def parse_qvina_outputs(docked_sdf_path):
suppl = Chem.SDMolSupplier(docked_sdf_path)
results = []
for i, mol in enumerate(suppl):
if mol is None:
continue
line = mol.GetProp('REMARK').splitlines()[0].split()[2:]
results.append(EasyDict({
'rdmol': mol,
'mode_id': i,
'affinity': float(line[0]),
'rmsd_lb': float(line[1]),
'rmsd_ub': float(line[2]),
}))
return results
class BaseDockingTask(object):
def __init__(self, pdb_block, ligand_rdmol):
super().__init__()
self.pdb_block = pdb_block
self.ligand_rdmol = ligand_rdmol
def run(self):
raise NotImplementedError()
def get_results(self):
raise NotImplementedError()
class QVinaDockingTask(BaseDockingTask):
@classmethod
def from_generated_data(cls, data, protein_root='./data/crossdocked', **kwargs):
protein_fn = os.path.join(
os.path.dirname(data.ligand_filename),
os.path.basename(data.ligand_filename)[:10] + '.pdb'
)
protein_path = os.path.join(protein_root, protein_fn)
with open(protein_path, 'r') as f:
pdb_block = f.read()
ligand_rdmol = reconstruct_from_generated(data)
return cls(pdb_block, ligand_rdmol, **kwargs)
@classmethod
def from_original_data(cls, data, ligand_root='./data/crossdocked_pocket10', protein_root='./data/crossdocked', **kwargs):
protein_fn = os.path.join(
os.path.dirname(data.ligand_filename),
os.path.basename(data.ligand_filename)[:10] + '.pdb'
)
protein_path = os.path.join(protein_root, protein_fn)
with open(protein_path, 'r') as f:
pdb_block = f.read()
ligand_path = os.path.join(ligand_root, data.ligand_filename)
ligand_rdmol = next(iter(Chem.SDMolSupplier(ligand_path)))
return cls(pdb_block, ligand_rdmol, **kwargs)
def __init__(self, pdb_block, ligand_rdmol, conda_env='adt', tmp_dir='./tmp', use_uff=True, center=None):
super().__init__(pdb_block, ligand_rdmol)
self.conda_env = conda_env
self.tmp_dir = os.path.realpath(tmp_dir)
os.makedirs(tmp_dir, exist_ok=True)
self.task_id = get_random_id()
self.receptor_id = self.task_id + '_receptor'
self.ligand_id = self.task_id + '_ligand'
self.receptor_path = os.path.join(self.tmp_dir, self.receptor_id + '.pdb')
self.ligand_path = os.path.join(self.tmp_dir, self.ligand_id + '.sdf')
with open(self.receptor_path, 'w') as f:
f.write(pdb_block)
ligand_rdmol = Chem.AddHs(ligand_rdmol, addCoords=True)
if use_uff:
UFFOptimizeMolecule(ligand_rdmol)
sdf_writer = Chem.SDWriter(self.ligand_path)
sdf_writer.write(ligand_rdmol)
sdf_writer.close()
self.ligand_rdmol = ligand_rdmol
pos = ligand_rdmol.GetConformer(0).GetPositions()
if center is None:
self.center = (pos.max(0) + pos.min(0)) / 2
else:
self.center = center
self.proc = None
self.results = None
self.output = None
self.docked_sdf_path = None
def run(self, exhaustiveness=16):
commands = """
eval "$(conda shell.bash hook)"
conda activate {env}
cd {tmp}
# Prepare receptor (PDB->PDBQT)
prepare_receptor4.py -r {receptor_id}.pdb
# Prepare ligand
obabel {ligand_id}.sdf -O{ligand_id}.pdbqt
qvina2.1 \
--receptor {receptor_id}.pdbqt \
--ligand {ligand_id}.pdbqt \
--center_x {center_x:.4f} \
--center_y {center_y:.4f} \
--center_z {center_z:.4f} \
--size_x 20 --size_y 20 --size_z 20 \
--exhaustiveness {exhaust}
obabel {ligand_id}_out.pdbqt -O{ligand_id}_out.sdf -h
""".format(
receptor_id = self.receptor_id,
ligand_id = self.ligand_id,
env = self.conda_env,
tmp = self.tmp_dir,
exhaust = exhaustiveness,
center_x = self.center[0],
center_y = self.center[1],
center_z = self.center[2],
)
self.docked_sdf_path = os.path.join(self.tmp_dir, '%s_out.sdf' % self.ligand_id)
self.proc = subprocess.Popen(
'/bin/bash',
shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.proc.stdin.write(commands.encode('utf-8'))
self.proc.stdin.close()
# return commands
def run_sync(self):
self.run()
while self.get_results() is None:
pass
results = self.get_results()
print('Best affinity:', results[0]['affinity'])
return results
def get_results(self):
if self.proc is None: # Not started
return None
elif self.proc.poll() is None: # In progress
return None
else:
if self.output is None:
self.output = self.proc.stdout.readlines()
try:
self.results = parse_qvina_outputs(self.docked_sdf_path)
except:
print('[Error] Vina output error: %s' % self.docked_sdf_path)
return []
return self.results