Skip to content

Handle nullable types and empty partitions before Dask-ML predict #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion dask_sql/physical/rel/custom/create_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import TYPE_CHECKING

import numpy as np
from dask import delayed

from dask_sql.datacontainer import DataContainer
Expand Down Expand Up @@ -183,7 +184,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
output_meta = np.array([])
model = ParallelPostFit(
estimator=model,
predict_meta=output_meta,
predict_proba_meta=output_meta,
transform_meta=output_meta,
)

else:
model.fit(X, y, **fit_kwargs)
Expand Down
56 changes: 55 additions & 1 deletion dask_sql/physical/rel/custom/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
import uuid
from typing import TYPE_CHECKING

import numpy as np

try:
from dask_ml.wrappers import ParallelPostFit
except ImportError: # pragma: no cover
raise ValueError("Wrapping requires dask-ml to be installed.")

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin

Expand Down Expand Up @@ -59,7 +66,18 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

model, training_columns = context.schema[schema_name].models[model_name]
df = context.sql(sql_select)
prediction = model.predict(df[training_columns])
part = df[training_columns]

if isinstance(model, ParallelPostFit):
output_meta = model.predict_meta
if output_meta is None:
output_meta = model.estimator.predict(part._meta_nonempty)
prediction = part.map_partitions(
self._predict, output_meta, model.estimator, meta=output_meta
)
else:
prediction = model.predict(part)

predicted_df = df.assign(target=prediction)

# Create a temporary context, which includes the
Expand All @@ -79,3 +97,39 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
dc = DataContainer(predicted_df, cc)

return dc

def _predict(self, part, predict_meta, estimator):
if part.shape[0] == 0 and predict_meta is not None:
empty_output = self.handle_empty_partitions(predict_meta)
if empty_output is not None:
return empty_output
return estimator.predict(part)

def handle_empty_partitions(self, output_meta):
if hasattr(output_meta, "__array_function__"):
if len(output_meta.shape) == 1:
shape = 0
else:
shape = list(output_meta.shape)
shape[0] = 0
ar = np.zeros(
shape=shape,
dtype=output_meta.dtype,
like=output_meta,
)
return ar
elif "scipy.sparse" in type(output_meta).__module__:
# sparse matrices don't support
# `like` due to non implimented __array_function__
# Refer https://github.com/scipy/scipy/issues/10362
# Note below works for both cupy and scipy sparse matrices
if len(output_meta.shape) == 1:
shape = 0
else:
shape = list(output_meta.shape)
shape[0] = 0

ar = type(output_meta)(shape, dtype=output_meta.dtype)
return ar
elif hasattr(output_meta, "iloc"):
return output_meta.iloc[:0, :]
74 changes: 74 additions & 0 deletions tests/integration/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,77 @@ def test_experiment_automl_regressor(c, client, training_df):
), "Best model was not registered"

check_trained_model(c, "my_automl_exp2")


def test_predict_with_nullable_types(c):
df = pd.DataFrame(
{
"rough_day_of_year": [0, 1, 2, 3],
"prev_day_inches_rained": [0, 1, 2, 3],
"rained": [False, False, False, True],
}
)
c.create_table("train_set", df)

model_class = "'sklearn.linear_model.LogisticRegression'"

c.sql(
f"""
CREATE OR REPLACE MODEL model WITH (
model_class = {model_class},
wrap_predict = True,
wrap_fit = False,
target_column = 'rained'
) AS (
SELECT *
FROM train_set
)
"""
)

expected = c.sql(
"""
SELECT * FROM PREDICT(
MODEL model,
SELECT * FROM train_set
)
"""
)

df = pd.DataFrame(
{
"rough_day_of_year": pd.Series([0, 1, 2, 3], dtype='Int32'),
"prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype='float32'),
"rained": pd.Series([False, False, False, True]),
}
)
c.create_table("train_set", df)

c.sql(
f"""
CREATE OR REPLACE MODEL model WITH (
model_class = {model_class},
wrap_predict = True,
wrap_fit = False,
target_column = 'rained'
) AS (
SELECT *
FROM train_set
)
"""
)

result = c.sql(
"""
SELECT * FROM PREDICT(
MODEL model,
SELECT * FROM train_set
)
"""
)

assert_eq(
expected,
result,
check_dtype=False,
)