Marginally Stable

Dask and Scikit-Learn -- Model Parallelism

Mon 11 July 2016 — under

This is the first of a series of posts discussing some recent experiments combining dask and scikit-learn. A small (and extremely alpha) library has been built up from these experiments, and can be found here.

Before we start, I would like to make the following caveats:

There are several ways of parallelizing algorithms in machine learning. Some algorithms can be made to be data-parallel (either across features or across samples). In this post we'll look instead at model-parallelism (use same data across different models), and dive into a daskified implementation of GridSearchCV.

What is grid search?

Many machine learning algorithms have hyperparameters which can be tuned to improve the performance of the resulting estimator. A grid search is one way of optimizing these parameters — it works by doing a parameter sweep across a cartesian product of a subset of these parameters (the "grid"), and then choosing the best resulting estimator. Since this is fitting many independent estimators across the same set of data, it can be fairly easily parallelized.

Grid search with scikit-learn

In scikit-learn, a grid search is performed using the GridSearchCV class, and can (optionally) be automatically parallelized using joblib.

This is best illustrated with an example. First we'll make an example dataset for doing classification against:

from sklearn.datasets import make_classification

X, y = make_classification(n_samples=10000,
                           n_features=500,
                           n_classes=2,
                           n_redundant=250,
                           random_state=42)

To solve this classification problem, we'll create a pipeline of a PCA and a LogisticRegression:

from sklearn import linear_model, decomposition
from sklearn.pipeline import Pipeline

logistic = linear_model.LogisticRegression()
pca = decomposition.PCA()
pipe = Pipeline(steps=[('pca', pca),
                       ('logistic', logistic)])

Both of these classes take several hyperparameters, we'll do a grid-search across only a few of them:

#Parameters of pipelines can be set using ‘__’ separated parameter names:
grid = dict(pca__n_components=[50, 100, 250],
            logistic__C=[1e-4, 1.0, 1e4],
            logistic__penalty=['l1', 'l2'])

Finally, we can create an instance of GridSearchCV, and perform the grid search. The parameter n_jobs=-1 tells joblib to use as many processes as I have cores (8).

from sklearn.grid_search import GridSearchCV

estimator = GridSearchCV(pipe, grid, n_jobs=-1)

%time estimator.fit(X, y)
CPU times: user 5.3 s, sys: 243 ms, total: 5.54 s
Wall time: 21.6 s

What happened here was:

The corresponding best score, parameters, and estimator can all be found as attributes on the resulting object:

estimator.best_score_
0.89290000000000003
estimator.best_params_
{'logistic__C': 0.0001, 'logistic__penalty': 'l2', 'pca__n_components': 50}
estimator.best_estimator_
Pipeline(steps=[('pca', PCA(copy=True, n_components=50, whiten=False)), ('logistic', LogisticRegression(C=0.0001, class_weight=None, dual=False,
        fit_intercept=True, intercept_scaling=1, max_iter=100,
        multi_class='ovr', n_jobs=1, penalty='l2', random_state=None,
        solver='liblinear', tol=0.0001, verbose=0, warm_start=False))])<div class=md_output>

    {'logistic__C': 0.0001, 'logistic__penalty': 'l2', 'pca__n_components': 50}

Grid search with dask-learn

Here we'll repeat the same fit using dask-learn. I've tried to match the scikit-learn interface as much as possible, although not everything is implemented. Here the only thing that really changes is the GridSearchCV import. We don't need the n_jobs keyword, as this will be parallelized across all cores by default.

from dklearn.grid_search import GridSearchCV as DaskGridSearchCV

destimator = DaskGridSearchCV(pipe, grid)

%time destimator.fit(X, y)
CPU times: user 16.3 s, sys: 1.89 s, total: 18.2 s
Wall time: 5.63 s

As before, the best score, parameters, and estimator can all be found as attributes on the object. Here we'll just show that they're equivalent:

destimator.best_score_ == estimator.best_score_
True
destimator.best_params_ == estimator.best_params_
True
destimator.best_estimator_
Pipeline(steps=[('pca', PCA(copy=True, n_components=50, whiten=False)), ('logistic', LogisticRegression(C=0.0001, class_weight=None, dual=False,
        fit_intercept=True, intercept_scaling=1, max_iter=100,
        multi_class='ovr', n_jobs=1, penalty='l2', random_state=None,
        solver='liblinear', tol=0.0001, verbose=0, warm_start=False))])<div class=md_output>

    {'logistic__C': 0.0001, 'logistic__penalty': 'l2', 'pca__n_components': 50}

Why is the dask version faster?

If you look at the times above, you'll note that the dask version was ~4X faster than the scikit-learn version. This is not because we have optimized any of the pieces of the Pipeline, or that there's a significant amount of overhead to joblib (on the contrary, joblib does some pretty amazing things, and I had to construct a contrived example to beat it this badly). The reason is simply that the dask version is doing less work.

This maybe best explained in pseudocode. The scikit-learn version of the above (in serial) looks something like (pseudocode):

for X_train, X_test, y_train, y_test in cv:
    for n in grid['pca__n_components']:
        for C in grid['logistic__C']:
            for penalty in grid['logistic__penalty']:
                # Create and fit a PCA on the input data
                pca = PCA(n_components=n).fit(X_train, y_train)
                # Transform both the train and test data
                X_train2 = pca.transform(X_train)
                X_test2 = pca.transform(X_test)
                # Create and fit a LogisticRegression on the transformed data
                logistic = LogisticRegression(C=C, penalty=penalty)
                logistic.fit(X_train2, y_train)
                # Score the total pipeline
                score = logistic.score(X_test2, y_test)
                # Save the score and parameters
                scores_and_params.append((score, n, C))

# Find the best set of parameters (for some definition of best)
find_best_parameters(scores)

This is looping through a cartesian product of the cross-validation sets and all the parameter combinations, and then creating and fitting a new estimator for each combination. While embarassingly parallel, this can also result in repeated work, as earlier stages in the pipeline are refit multiple times on the same parameter + data combinations.

In contrast, the dask version hashes all inputs (forming a sort of Merkle DAG), resulting in the intermediate results being shared. Keeping with the pseudocode above, the dask version might look like:

for X_train, X_test, y_train, y_test in cv:
    for n in grid['pca__n_components']:
        # Create and fit a PCA on the input data
        pca = PCA(n_components=n).fit(X_train, y_train)
        # Transform both the train and test data
        X_train2 = pca.transform(X_train)
        X_test2 = pca.transform(X_test)
        for C in grid['logistic__C']:
            for penalty in grid['logistic__penalty']:
                # Create and fit a LogisticRegression on the transformed data
                logistic = LogisticRegression(C=C, penalty=penalty)
                logistic.fit(X_train2, y_train)
                # Score the total pipeline
                score = logistic.score(X_test2, y_test)
                # Save the score and parameters
                scores_and_params.append((score, n, C, penalty))

# Find the best set of parameters (for some definition of best)
find_best_parameters(scores)

This can still be parallelized, but in a less straightforward manner - the graph is a bit more complicated than just a simple map-reduce pattern. Thankfully the dask schedulers are well equipped to handle arbitrary graph topologies. Below is a GIF showing how the dask scheduler (the threaded scheduler specifically) executed the grid search performed above. Each rectangle represents data, and each circle represents a task. Each is categorized by color:

Dask Graph Execution

Looking at the trace, a few things stand out:

Distributed grid search using dask-learn

The schedulers used in dask are configurable. The default (used above) is the threaded scheduler, but we can just as easily swap it out for the distributed scheduler. Here I've just spun up two local workers to demonstrate, but this works equally well across multiple machines.

from distributed import Executor

# Create an Executor, and set it as the default scheduler
exc = Executor('10.0.0.3:8786', set_as_default=True)
exc
<Executor: scheduler="10.0.0.3:8786" processes=2 cores=8>
%time destimator.fit(X, y)
CPU times: user 1.69 s, sys: 433 ms, total: 2.12 s
Wall time: 7.66 s
(destimator.best_score_ == estimator.best_score_ and
 destimator.best_params_ == estimator.best_params_)
True

Note that this is slightly slower than the threaded execution, so it doesn't make sense for this workload, but for others it might.

What worked well

Caveats and what could be better

Help

I am not a machine learning expert. Is any of this useful? Do you have suggestions for improvements (or better yet PRs for improvements :))? Please feel free to reach out in the comments below, or on github.

This work is supported by Continuum Analytics and the XDATA program as part of the Blaze Project.

comments powered by Disqus

All content copyright 2014-2017 Jim Crist unless otherwise noted. Licensed under Creative Commons.

Find me on Twitter, GitHub, Speaker Deck, or shoot me an email.