Options to run pandas DataFrame.apply in parallel

A common use case in pandas is to want to apply a function to rows in a DataFrame. For a novice, the temptation can be to iterate through the rows in the DataFrame and pass the data to a function, but that is not a good idea. (You can read this article for a detailed explanation of why). Pandas has a method on both DataFrames and Series that applies a function to the data. Ideally, we want that to be a vectorized implementation. But in many cases a non-vectorized implementation already exists, or the solution cannot be vectorized. If the DataFrame is large enough, or the function slow enough, applying the function can be very time consuming. In those situations, a way to speed things up is to run the code in parallel on multiple CPUs. In this article, I’ll survey a number of popular options for applying functions to pandas DataFrames in parallel.

An example problem

To make things more concrete, let’s consider an example where each row in a DataFrame represents a sample of data. We want to calculate a value from each row. The calculation might be slow. For demonstration purposes, we’ll just invent a CPU intensive task. It turns out calculating arctangent is one such task, so we’ll just make a function that does a lot of that. Our data will be a simple DataFrame with one data point per row, but it will be randomized so that each row is likely to be unique. We want unique data so that optimization via caching or memoization doesn’t impact our comparisons.

import pandas as pd
import numpy as np

import math

# our slow function
def slow_function(start: float) -> float:
    res = 0
    for i in range(int(math.pow(start, 7))):
        res += math.atan(i) * math.atan(i)
    return res
%timeit slow_function(5)
18.5 ms ± 465 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

We can see that this function is fairly slow, so calculating it over hundreds of values will take multiple seconds.

# make sample data
sample = pd.DataFrame({'value': np.random.random(500) + 5})
sample.tail()
        value
495  5.577242
496  5.484517
497  5.136881
498  5.174797
499  5.644561

Running apply

Now if we want to run our slow_function on each row, we can use apply. One quick note on DataFrame.apply – it will apply per column by default (it will use axis 0). This means the function will be invoked once per column. The applied function receives the column (a Series) each time it is called, not each row (also a Series). If we use axis=1, then apply will pass each row to the function instead. This is the choice what we want here.

I’m using a lambda to pick out the value column to pass into the slow_function. At the end, I turn the resulting Series back into a DataFrame.

sample[-5:].apply(lambda r: slow_function(r['value']), axis=1).to_frame(name="value")
             value
495  414125.614960
496  368264.399398
497  232842.530062
498  245144.830221
499  450413.419081

That is a little ugly though. Wouldn’t it be great if we could just use a vectorized solution on the entire column instead? Well it turns out there’s a very easy way to create a vectorized solution using Numpy, just wrap it in np.vectorize.

sample[-5:].apply(np.vectorize(slow_function))
             value
495  414125.614960
496  368264.399398
497  232842.530062
498  245144.830221
499  450413.419081

But is this an optimized vectorized solution? Unfortunately it’s not. The docs for np.vectorize point this out:

The vectorize function is provided primarily for convenience, not for
performance. The implementation is essentially a for loop.

Let’s verify the speeds here with some timings. We’ll also just try running apply on the value column, which is a pandas Series. In this case, there’s only one axis, so it applies the function to each element.

%timeit sample.apply(lambda r: slow_function(r['value']), axis=1)
17.5 s ± 426 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample.apply(np.vectorize(slow_function))
17.6 s ± 176 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample['value'].apply(slow_function)
17.7 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

So all three of these methods are essentially doing the same thing. While the code for np.vectorize looks nice and clean, it’s not faster. Each solution is running a for loop over each row in the DataFrame (or Series), running our slow_function on each value. So let’s move on to the goal of this article: let’s run this function on multiple cores at once.

Parallel processing

Before we step into running our code on multiple cores, let’s cover a few basics. Everything we’ve done thus far has all been done in one process. This means that the Python code is all running on one CPU core, even if my computer has multiple CPU cores available.

If we want to take advantage of multiple processes or cores at once, we have that option in Python. The basic idea is to run multiple Python processes, and have each one perform a fraction of the calculations. Then all the results are returned to the primary process. For example, if we have 4 cores available, then we should be able to have each core perform 25% of the calculations at the same time. In theory, the job will be done 4 times faster. In reality, it will be less efficient than that.

Comparing implementations

Before we move on to parallel implementations, let’s setup the code we’ll use to compare them.

Note that all of the code samples below are in one Python file (slow_function.py) for your convenience. You can use it to run the timings you’ll see below, or run an any implementation from the command line. You can access it here in my github repo and follow along in your own environment.

To run this code, I created a clean virtualenv for this article using pyenv and installed Python 3.9.12. All the projects were installed in the same virtualenv.

For all of these code samples, we’ll assume we have the following code is available:

import math
import sys
import argparse
import multiprocessing

import numpy as np

def slow_function(start: float) -> float:
    res = 0
    for i in range(int(math.pow(start, 7))):
        res += math.atan(i) * math.atan(i)
    return res


def get_sample():
    data = {'value': np.random.random(500) + 5}
    return data

Here is the default (single CPU) implementation, the same as what we ran above:

def run_default():
    import pandas as pd

    sample = pd.DataFrame(get_sample())
    sample['results'] = sample['value'].apply(slow_function)
    print("Default results:\n", sample.tail(5))

My method for timing this is to run the timeit module on the code above, like this:

python -m timeit "import slow_function; slow_function.run_default()"

Which yields

1 loop, best of 5: 17.4 sec per loop

As seen above, our base problem is about 17 seconds to run. How much can we improve on that?

Core multiprocessing

As a base parallel case, we will implement a solution with the core Python multiprocessing module. Then we will look at a number of popular libraries that make this task easier to implement. You can decide which one is easiest to understand and use for your purposes. We’ll also look at a few interesting tidbits about the projects that can help you make a decision on whether to use them.

The multiprocessing module is fairly straightforward to use. It comes with core python, so there is no extra installation step. We only need to invoke it correctly. There are several ways to use the module, but I’ll show you an example using multiprocessing.Pool.

For more details on multiprocessing, you can read my article that shows basic usage.

Note that multiprocessing doesn’t know about pandas and DataFrames, so to send each row into the pool, we have to provide either the guts of our data, or an iterable.

Some multiprocessing gotchas

FYI: when using multiprocessing, you also might have to put your slow_function in a separate python file since the processes that are launched by the multiprocessing module have to have access to the same functions. This tends to show up on some platforms, like Windows or when running from Jupyter notebooks. In the case where I was running this code in a Jupyter notebook, I saw this error if using functions defined in the notebook: AttributeError: Can't get attribute 'slow_function' on <module '__main__' (built-in)>.

This is what a multiprocessing implementaton looks like.

def run_multiprocessing():
    import pandas as pd

    sample = pd.DataFrame(get_sample())
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        results = pool.map(slow_function, sample['value'])
    sample['results'] = results
    print("Multiprocessing results:\n", sample.tail(5))

Again, running this using timeit as follows:

python -m timeit "import slow_function; slow_function.run_multiprocessing()"

produces

1 loop, best of 5: 5.86 sec per loop

Now we can see that the multiprocessing version runs a little more than 3x faster. My machine has 4 real cores (and 8 virtual cores), so this is somewhat in line with expectations. Instead of being 4x faster, it has to deal with a bit of overhead for copying the data and competing with other tasks on my machine. If we had even more cores available, we could further improve the performance

Other options

Even with a simple example it’s clear that using multiprocessing is not seamless. We have to extract the data from our DataFrame to pass into the pool.map function, and the results are returned in a list. There’s also a __main__ guard boilerplate, and we had to move our function out to a separate file for Jupyter to work with it.

There are a number of projects that build on top of multiprocessing, pandas, and other projects. Some of them even work directly with the concept of a DataFrame, but support distributed computing. For the rest of the article, we’ll implement this simple problem using each project. This demonstrates how each one works and the basic steps to get it running.

Joblib

Joblib is a generic set of tools for pipelining code in Python. It’s not specifically integrated with pandas, but it’s easy enough to use and has some other nice features such as disk caching of functions and memoization.

You can install joblib with pip:

pip install joblib

The example code is fairly simple:

def run_joblib():
    import pandas as pd
    from joblib import Parallel, delayed

    sample = pd.DataFrame(get_sample())
    results = Parallel(n_jobs=multiprocessing.cpu_count())(
            delayed(slow_function)(i) for i in sample['value']
            )
    sample['results'] = results
    print("joblib results:\n", sample.tail(5))

Checking the performance:

python -m timeit "import slow_function; slow_function.run_joblib()"

gives us

1 loop, best of 5: 5.77 sec per loop

For general parallel processing, joblib makes for cleaner code than the multiprocessing. The speed is the same, and the project offers some extra tools that can be helpful.

Now we’ll look a few projects that are more closely integrated with pandas. If you’re used to working with pandas and look back at the code we’ve written so far, it might look a little clunky and different from other pandas DataFrame methods that you’re used to. The rest of the projects will look quite a bit more like standard pandas code.

Dask

Dask is a library that scales the standard PyData tools, like pandas, NumPy, and scikit-learn. From a code perspective, it usually looks pretty similar to the code you are used to, but it’s possible to scale out to multiple cores on one machine, or even clusters of multiple machines. Even though we are only looking at processing a DataFrame that will fit into memory on one machine, it’s possible to run code with Dask that uses more memory than available on the main node. But Dask work great with on your local machine and even provides benefits without a full cluster.

Indexing in pandas can be so confusing

There are so many ways to do the same thing! What is the difference between .loc, .iloc, .ix, and []?  You can read the official documentation but there's so much of it and it seems so confusing. You can ask a question on Stack Overflow, but you're just as likely to get too many different and confusing answers as no answer at all. And existing answers don't fit your scenario.

You just need to get started with the basics.

What if you could quickly learn the basics of indexing and selecting data in pandas with clear examples and instructions on why and when you should use each one? What if the examples were all consistent, used realistic data, and included extra relevant background information?

Master the basics of pandas indexing with my free ebook. You'll learn what you need to get comfortable with pandas indexing. Covered topics include:

  • what an index is and why it is needed
  • how to select data in both a Series and DataFrame.
  • the difference between .loc, .iloc, .ix, and [] and when (and if) you should use them.
  • slicing, and how pandas slicing compares to regular Python slicing
  • boolean indexing
  • selecting via callable
  • how to use where and mask.
  • how to use query, and how it can help performance
  • time series indexing

Because it's highly focused, you'll learn the basics of indexing and be able to fall back on this knowledge time and again as you use other features in pandas.

Just give me your email and you'll get the free 57 page e-book, along with helpful articles about Python, pandas, and related technologies once or twice a month. Unsubscribe at any time.

Invalid email address

As you see in the code below, a Dask DataFrame wraps a regular pandas DataFrame, and supplies a similar interface. The difference with Dask is that sometimes you need to supply some hints to the calculation (the meta argument to apply), and the execution is always deferred. To get the result, you call compute. But writing this code feels much the same as writing normal pandas code.

You can install it using pip:

pip install "dask[complete]"

or if you’re using conda:

conda install dask

This is a very basic intro, read the introductory docs for more complete examples.

In order for dask to run in parallel on a local host, you’ll have to start a local cluster. We do this only once.

# global variable
DASK_RUNNING = False

def run_dask():
    import pandas as pd
    import dask.dataframe as dd

    # hack for allowing our timeit code to work with one cluster
    global DASK_RUNNING

    if not DASK_RUNNING:
        # normally, you'll do this once in your code
        from dask.distributed import Client, LocalCluster
        cluster = LocalCluster()  # Launches a scheduler and workers locally
        client = Client(cluster)  # Connect to distributed cluster and override default
        print(f"Started cluster at {cluster.dashboard_link}")
        DASK_RUNNING = True

    sample = pd.DataFrame(get_sample())

    dask_sample = dd.from_pandas(sample, npartitions=multiprocessing.cpu_count())
    dask_sample['results'] = dask_sample['value'].apply(slow_function, meta=('value', 'float64')).compute()

    print("Dask results:\n", dask_sample.tail(5))

Again, we time this as follows:

python -m timeit "import slow_function; slow_function.run_dask()"

and get

1 loop, best of 5: 5.21 sec per loop

Note that when you are running a local cluster, you can access a handy dashboard for monitoring the cluster, it’s available via the field cluster.dashboard_link.

On my machine, Dask performs as well as the other parallel options. It has the added benefit of monitoring and further scalability.

Modin

Modin is a library that is built on top of Dask (and other libraries) but serves as a drop in replacement for pandas, making it even easier to work with existing code. When using Modin, they suggest replacing the import pandas as pd line as import modin.pandas as pd. That may be the only change needed to take advantage of it. Modin will provide speed improvements out of the box, and with some configuration and use of other libraries, can continue to scale up.

You install Modin with pip:

pip install modin

But you’ll need to install a backend as well. See the section of the docs for more details. Since we just installed Dask above, I’ll use that. I’ll also run the Dask cluster for Modin to use.

pip install "modin[dask]"

Note that besides the imports and Dask setup, our code looks exactly like bare pandas code.

def run_modin():
    global DASK_RUNNING

    import os

    os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

    if not DASK_RUNNING:
        from dask.distributed import Client, LocalCluster
        cluster = LocalCluster()  # Launches a scheduler and workers locally
        client = Client(cluster)  # Connect to distributed cluster and override default
        print(f"Started cluster at {cluster.dashboard_link}")
        DASK_RUNNING = True

    import modin.pandas as pd
    sample = pd.DataFrame(get_sample())
    sample['results'] = sample['value'].apply(slow_function)
    print("Modin results:\n", sample.tail(5))

Timing from

python -m timeit "import slow_function; slow_function.run_modin()"

gives us

1 loop, best of 5: 5.57 sec per loop

Modin with Dask provides the benefits of Dask, without the code differences.

Swifter

Swifter is a package that figures out the best way to apply a function to a pandas DataFrame. It can do several things, including multiprocessing and vectorization. It integrates with other libraries like Dask and Modin, and will attempt to use them in the most efficient way possible. To use it, you just use the Swifter version of apply, not the one from DataFrame – as shown below.

You can install swifter with pip:

pip install swifter

To use it with Modin, just import modin before swifter (or register it with swifter.register_modin()). It’s almost the same as the base pandas version.

def run_swifter():
    global DASK_RUNNING

    import os

    os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

    if not DASK_RUNNING:
        from dask.distributed import Client, LocalCluster
        cluster = LocalCluster()  # Launches a scheduler and workers locally
        client = Client(cluster)  # Connect to distributed cluster and override default
        print(f"Started cluster at {cluster.dashboard_link}")
        DASK_RUNNING = True

    import pandas as pd
    import swifter

    swifter.register_modin() # or could import modin.pandas as pd

    sample = pd.DataFrame(get_sample())
    sample['results'] = sample['value'].swifter.apply(slow_function)
    print("Swifter results:\n", sample.tail(5))

Double-checking the performance:

python -m timeit "import slow_function; slow_function.run_swifter()"

gives us slightly slower results:

1 loop, best of 5: 12.3 sec per loop

While there is a speed difference (Swifter is slowere here), this can be explained by the fact that Swifter samples the data in order to determine whether it is worthwhile to use a parallel option. For larger calculations, this extra work will be negligible. Changing the defaults is very easy through configuration, see docs for more details.

Swifter also includes some handy progress bars in both the shell and Jupyter notebooks. For longer running jobs, that is very convenient.

Pandarallel

Pandarallel is another project that integrates with pandas, similar to Swifter. You need to do a small initialization, then use the extra DataFrame methods to apply a method to a DataFrame in parallel. It has nice support for Jupyter progress bars as well, which can be a nice touch for users running it in a notebook. It doesn’t have the same level of support for distributed libraries like Dask. But it’s very simple code to write.

You install Pandarallel with pip:

pip install pandarallel
def run_pandarallel():
    from pandarallel import pandarallel

    pandarallel.initialize()

    import pandas as pd
    sample = pd.DataFrame(get_sample())
    sample['results'] = sample['value'].parallel_apply(slow_function)
    print("Pandarallel results:\n", sample.tail(5))

Checking results with

python -m timeit "import slow_function; slow_function.run_pandarallel()"

yields

1 loop, best of 5: 5.12 sec per loop

If you are only looking for a simple way to run apply in parallel, and don’t need the other improvements of the other projects, it can be a good option.

PySpark

PySpark is a Python interface to Apache Spark. The Spark project is a multi-language engine for executing data engineering, data science, and machine learning tasks in a clustered environment. Similar to Dask, it can scale up from single machines to entire clusters. It also supports multiple languages.

PySpark contains a pandas API, so it is possible to write pandas code that works on Spark with little effort. Note that the pandas API is not 100% complete and also has some minor differences from standard pandas. But as you’ll see, there are performance impacts that might make porting code to PySpark worth it.

You can install pyspark with pip (I also needed to install PyArrow):

pip install pyspark pyarrow

The sample code is similar to basic pandas.

def run_pyspark():

    import pyspark.pandas as ps

    sample = ps.DataFrame(get_sample())
    sample['results'] = sample['value'].apply(slow_function)
    print("PySpark results:\n", sample.tail(5))

Testing the speed with

python -m timeit "import slow_function; slow_function.run_pyspark()"

gives

1 loop, best of 5: 2.73 sec per loop

This is quite a bit faster than the other options. But it’s worth noting here that the underlying implementation is not running the same pandas code on more CPUs, but rather running the Spark code on multiple CPUs. This is just a simple example, and there is quite a bit of configuration possible with Spark, but you can see that pandas integration makes trying it out quite easy.

Summary

We looked at a simple CPU bound function that we applied to a DataFrame of data. This was our base case. We then used the following libraries to implement a parallel version:

  • multiprocessing
  • joblib
  • Dask
  • Modin
  • Swifter
  • Pandarallel
  • PySpark

Each of these projects offers features and improvements over the base multiprocessing version, with improvements from 3 to 7 times over our base case. Depending on your needs, one of these projects can offer improved readability and scalability.