Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import random | |
| import string | |
| from easydict import EasyDict | |
| from rdkit import Chem | |
| from rdkit.Chem.rdForceFieldHelpers import UFFOptimizeMolecule | |
| from .reconstruct import reconstruct_from_generated | |
| def get_random_id(length=30): | |
| letters = string.ascii_lowercase | |
| return ''.join(random.choice(letters) for i in range(length)) | |
| def load_pdb(path): | |
| with open(path, 'r') as f: | |
| return f.read() | |
| def parse_qvina_outputs(docked_sdf_path): | |
| suppl = Chem.SDMolSupplier(docked_sdf_path) | |
| results = [] | |
| for i, mol in enumerate(suppl): | |
| if mol is None: | |
| continue | |
| line = mol.GetProp('REMARK').splitlines()[0].split()[2:] | |
| results.append(EasyDict({ | |
| 'rdmol': mol, | |
| 'mode_id': i, | |
| 'affinity': float(line[0]), | |
| 'rmsd_lb': float(line[1]), | |
| 'rmsd_ub': float(line[2]), | |
| })) | |
| return results | |
| class BaseDockingTask(object): | |
| def __init__(self, pdb_block, ligand_rdmol): | |
| super().__init__() | |
| self.pdb_block = pdb_block | |
| self.ligand_rdmol = ligand_rdmol | |
| def run(self): | |
| raise NotImplementedError() | |
| def get_results(self): | |
| raise NotImplementedError() | |
| class QVinaDockingTask(BaseDockingTask): | |
| def from_generated_data(cls, data, protein_root='./data/crossdocked', **kwargs): | |
| protein_fn = os.path.join( | |
| os.path.dirname(data.ligand_filename), | |
| os.path.basename(data.ligand_filename)[:10] + '.pdb' | |
| ) | |
| protein_path = os.path.join(protein_root, protein_fn) | |
| with open(protein_path, 'r') as f: | |
| pdb_block = f.read() | |
| ligand_rdmol = reconstruct_from_generated(data) | |
| return cls(pdb_block, ligand_rdmol, **kwargs) | |
| def from_original_data(cls, data, ligand_root='./data/crossdocked_pocket10', protein_root='./data/crossdocked', **kwargs): | |
| protein_fn = os.path.join( | |
| os.path.dirname(data.ligand_filename), | |
| os.path.basename(data.ligand_filename)[:10] + '.pdb' | |
| ) | |
| protein_path = os.path.join(protein_root, protein_fn) | |
| with open(protein_path, 'r') as f: | |
| pdb_block = f.read() | |
| ligand_path = os.path.join(ligand_root, data.ligand_filename) | |
| ligand_rdmol = next(iter(Chem.SDMolSupplier(ligand_path))) | |
| return cls(pdb_block, ligand_rdmol, **kwargs) | |
| def __init__(self, pdb_block, ligand_rdmol, conda_env='adt', tmp_dir='./tmp', use_uff=True, center=None): | |
| super().__init__(pdb_block, ligand_rdmol) | |
| self.conda_env = conda_env | |
| self.tmp_dir = os.path.realpath(tmp_dir) | |
| os.makedirs(tmp_dir, exist_ok=True) | |
| self.task_id = get_random_id() | |
| self.receptor_id = self.task_id + '_receptor' | |
| self.ligand_id = self.task_id + '_ligand' | |
| self.receptor_path = os.path.join(self.tmp_dir, self.receptor_id + '.pdb') | |
| self.ligand_path = os.path.join(self.tmp_dir, self.ligand_id + '.sdf') | |
| with open(self.receptor_path, 'w') as f: | |
| f.write(pdb_block) | |
| ligand_rdmol = Chem.AddHs(ligand_rdmol, addCoords=True) | |
| if use_uff: | |
| UFFOptimizeMolecule(ligand_rdmol) | |
| sdf_writer = Chem.SDWriter(self.ligand_path) | |
| sdf_writer.write(ligand_rdmol) | |
| sdf_writer.close() | |
| self.ligand_rdmol = ligand_rdmol | |
| pos = ligand_rdmol.GetConformer(0).GetPositions() | |
| if center is None: | |
| self.center = (pos.max(0) + pos.min(0)) / 2 | |
| else: | |
| self.center = center | |
| self.proc = None | |
| self.results = None | |
| self.output = None | |
| self.docked_sdf_path = None | |
| def run(self, exhaustiveness=16): | |
| commands = """ | |
| eval "$(conda shell.bash hook)" | |
| conda activate {env} | |
| cd {tmp} | |
| # Prepare receptor (PDB->PDBQT) | |
| prepare_receptor4.py -r {receptor_id}.pdb | |
| # Prepare ligand | |
| obabel {ligand_id}.sdf -O{ligand_id}.pdbqt | |
| qvina2.1 \ | |
| --receptor {receptor_id}.pdbqt \ | |
| --ligand {ligand_id}.pdbqt \ | |
| --center_x {center_x:.4f} \ | |
| --center_y {center_y:.4f} \ | |
| --center_z {center_z:.4f} \ | |
| --size_x 20 --size_y 20 --size_z 20 \ | |
| --exhaustiveness {exhaust} | |
| obabel {ligand_id}_out.pdbqt -O{ligand_id}_out.sdf -h | |
| """.format( | |
| receptor_id = self.receptor_id, | |
| ligand_id = self.ligand_id, | |
| env = self.conda_env, | |
| tmp = self.tmp_dir, | |
| exhaust = exhaustiveness, | |
| center_x = self.center[0], | |
| center_y = self.center[1], | |
| center_z = self.center[2], | |
| ) | |
| self.docked_sdf_path = os.path.join(self.tmp_dir, '%s_out.sdf' % self.ligand_id) | |
| self.proc = subprocess.Popen( | |
| '/bin/bash', | |
| shell=False, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| self.proc.stdin.write(commands.encode('utf-8')) | |
| self.proc.stdin.close() | |
| # return commands | |
| def run_sync(self): | |
| self.run() | |
| while self.get_results() is None: | |
| pass | |
| results = self.get_results() | |
| print('Best affinity:', results[0]['affinity']) | |
| return results | |
| def get_results(self): | |
| if self.proc is None: # Not started | |
| return None | |
| elif self.proc.poll() is None: # In progress | |
| return None | |
| else: | |
| if self.output is None: | |
| self.output = self.proc.stdout.readlines() | |
| try: | |
| self.results = parse_qvina_outputs(self.docked_sdf_path) | |
| except: | |
| print('[Error] Vina output error: %s' % self.docked_sdf_path) | |
| return [] | |
| return self.results | |