Fast Parallel Cheminformatics Workflows With Dask
Acclerate you workflows with a couple of lines of code.
In this post we'll take a look at how we can use Dask to parallelize and speed up Cheminformatics workflows with a couple of lines of code. As an example we'll write a scritpt to calculate the properties from Lipinski's Rule of 5. Just for fun, we'll also add in the number of rotatable bonds. We'll start by implementing a simple serial version, then modify that to run in parallel with Dask.
import pandas as pd
from rdkit import Chem
from rdkit.Chem import Descriptors
Let's ensure that that Pandas dataframes are displayed with an appropriate number of decimal places.
pd.options.display.float_format = "{:,.2f}".format
Define a function to calculate the properties
def calc_r5(smiles):
mol = Chem.MolFromSmiles(smiles)
dummy = -9999.0
res = [dummy, dummy, dummy, dummy, dummy]
if mol:
res = [Descriptors.MolWt(mol),Descriptors.MolLogP(mol),Descriptors.NumHDonors(mol),
Descriptors.NumHAcceptors(mol),Descriptors.NumRotatableBonds(mol)]
return res
Read a SMILES file into a Pandas dataframe
infile_name = "https://github.com/PatWalters/datafiles/blob/main/test.smi?raw=true"
df = pd.read_csv(infile_name,sep=" ",names=["SMILES","Name"])
Calculate the properties for the SMILES in the dataframe.
df['R5'] = df.SMILES.apply(calc_r5)
Now we have all of the properties as a list in one column. This isn't what we want. However, we can use this trick to create new columns from the list.
df[["MW","LogP","HBD","HBA","Rot"]] = df.R5.to_list()
df
At this point, we no longer need the R5 column, so we can get rid of it.
df.drop("R5",axis=1,inplace=True)
Finally, we can write the results to a csv file.
df.to_csv("props.csv",index=False)
import dask.dataframe as dd
We also need to convert the Pandas dataframe to a Dask dataframe. In the constructor for the Dask dataframe, we specify an argument "npartitions", that defines the number of chunks to divide the dataframe into for the calculation. This seems to be most efficeint when we set "npartitions" to the number of processor cores.
num_cores = 8
ddf = dd.from_pandas(df,npartitions=num_cores)
Parallelization with Dask requires a function that accepts a dataframe as input. We can define a function that uses the "apply" from the serial version above.
def df_r5(df_in):
return df_in.SMILES.apply(calc_r5)
Now we can parallelize our workflow with a single line of code.
df["R5"] = ddf.map_partitions(df_r5,meta='float').compute(scheduler='processes')
We can use the same method we described above to split the "R5" column into multiple columns.
df[["MW","LogP","HBD","HBA","Rot"]] = df.R5.to_list()
df.drop("R5",axis=1,inplace=True)
Benchmarking
Let's take a look at how much of a speedup we can get by using Dask. To do this, we'll take a look at how long it takes to calculate the five properties above for 1 million molecules from the ZINC database. The input file is large so I didn't include it in this post. Here's the code I used to perform the benchmark.
import time
df = pd.read_csv("zinc_1M.smi",sep=" ",names=["SMILES","Name"])
runtime_res = []
for num_cores in range(1,9):
ddf = dd.from_pandas(df,npartitions=num_cores)
start = time.time()
df["R5"] = ddf.map_partitions(df_r5,meta='float').compute(scheduler='processes')
elapsed = time.time()-start
df[["MW","LogP","HBD","HBA","Rot"]] = df.R5.to_list()
df.drop("R5",axis=1,inplace=True)
df.to_csv("props.csv",index=False,float_format="%0.2f")
print(f"{len(df)} molecules processed in {elapsed:0.2f} sec on {num_cores} cores")
runtime_res.append([elapsed,num_cores])
Make a dataframe so that we can view the results. We'll add a column with the runtime ratio for n cores to 1 core.
runtime_df = pd.DataFrame(runtime_res,columns=["Runtime","Cores"])
one_core_time = runtime_df.Runtime.values[0]
runtime_df['Ratio'] = one_core_time/runtime_df.Runtime
View the results. 5x speed-up with 8 cores, not too bad for one line of code.
runtime_df
Plot the results.
import seaborn as sns
sns.set(rc={'figure.figsize': (10, 10)})
sns.set_style('whitegrid')
sns.set_context('talk')
ax = sns.scatterplot(x="Cores",y="Runtime",data=runtime_df)
ax.set(ylabel="Runtime (sec)");
Caveats
The method described here is only going to work for datasets that will fit into memory. You can use this method to process millions of molecules, but it won't work for billions. Dask has other methods for dealing with data that won't fit into memory. We'll save that discussion for another day.