From 5febca76fa5c80804e817173a53d21fbec74e70c Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 22 Sep 2022 12:53:18 -0700 Subject: [PATCH 1/8] new pr --- dask_sql/physical/rel/custom/create_model.py | 9 ++- dask_sql/physical/rel/custom/predict.py | 53 +++++++++++++++- tests/integration/test_model.py | 64 ++++++++++++++++++++ 3 files changed, 124 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 2e6cdeb0a..d75c56ba9 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -1,6 +1,7 @@ import logging from typing import TYPE_CHECKING +import numpy as np from dask import delayed from dask_sql.datacontainer import DataContainer @@ -183,7 +184,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)] model = delayed_model[0].compute() - model = ParallelPostFit(estimator=model) + output_meta = np.array([]) + model = ParallelPostFit( + estimator=model, + predict_meta=output_meta, + predict_proba_meta=output_meta, + transform_meta=output_meta, + ) else: model.fit(X, y, **fit_kwargs) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index eb5e4b69f..8f70aed23 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -2,6 +2,12 @@ import uuid from typing import TYPE_CHECKING +import numpy as np +try: + from dask_ml.wrappers import ParallelPostFit +except ImportError: # pragma: no cover + raise ValueError("Wrapping requires dask-ml to be installed.") + from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin @@ -59,7 +65,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model, training_columns = context.schema[schema_name].models[model_name] df = context.sql(sql_select) - prediction = model.predict(df[training_columns]) + part = df[training_columns] + + if isinstance(model, ParallelPostFit): + output_meta = model.predict_meta + if output_meta is None: + output_meta = model.estimator.predict(part._meta_nonempty) + prediction = part.map_partitions(self._predict, output_meta, model.estimator, meta=output_meta) + else: + prediction = model.predict(part) + predicted_df = df.assign(target=prediction) # Create a temporary context, which includes the @@ -79,3 +94,39 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai dc = DataContainer(predicted_df, cc) return dc + + def _predict(self, part, predict_meta, estimator): + if part.shape[0] == 0 and predict_meta is not None: + empty_output = self.handle_empty_partitions(predict_meta) + if empty_output is not None: + return empty_output + return estimator.predict(part) + + def handle_empty_partitions(self, output_meta): + if hasattr(output_meta, "__array_function__"): + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + ar = np.zeros( + shape=shape, + dtype=output_meta.dtype, + like=output_meta, + ) + return ar + elif "scipy.sparse" in type(output_meta).__module__: + # sparse matrices don't support + # `like` due to non implimented __array_function__ + # Refer https://github.com/scipy/scipy/issues/10362 + # Note below works for both cupy and scipy sparse matrices + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + + ar = type(output_meta)(shape, dtype=output_meta.dtype) + return ar + elif hasattr(output_meta, "iloc"): + return output_meta.iloc[:0, :] diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 044a56fcc..4032116e1 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -924,3 +924,67 @@ def test_experiment_automl_regressor(c, client, training_df): ), "Best model was not registered" check_trained_model(c, "my_automl_exp2") + + +def test_predict_with_nullable_types(c): + df = pd.DataFrame({ + "rough_day_of_year": [0, 1, 2, 3], + "prev_day_inches_rained": [0, 1, 2, 3], + "next_day_inches_rained": [0, 1, 2, 3], + "rained": [False, False, False, True] + }) + c.create_table("train_set", df) + + model_class = "'sklearn.linear_model.LogisticRegression'" + + c.sql(f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """) + + expected = c.sql(""" + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """) + + df = pd.DataFrame({ + "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype='Int32'), + "prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype='float32'), + "rained": pd.Series([False, False, False, True]) + }) + c.create_table("train_set", df) + + c.sql(f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """) + + result = c.sql(""" + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """) + + assert_eq( + expected, + result, + check_dtype=False, + check_names=False, + ) From 55533fb1309a0bbfdb6b8afdbf060157527e69e9 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 22 Sep 2022 16:36:26 -0700 Subject: [PATCH 2/8] format fix --- dask_sql/physical/rel/custom/predict.py | 15 ++-- tests/integration/test_model.py | 94 ++++++++++++++----------- 2 files changed, 61 insertions(+), 48 deletions(-) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index 8f70aed23..818eb8af2 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING import numpy as np + try: from dask_ml.wrappers import ParallelPostFit except ImportError: # pragma: no cover @@ -71,7 +72,9 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai output_meta = model.predict_meta if output_meta is None: output_meta = model.estimator.predict(part._meta_nonempty) - prediction = part.map_partitions(self._predict, output_meta, model.estimator, meta=output_meta) + prediction = part.map_partitions( + self._predict, output_meta, model.estimator, meta=output_meta + ) else: prediction = model.predict(part) @@ -96,11 +99,11 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai return dc def _predict(self, part, predict_meta, estimator): - if part.shape[0] == 0 and predict_meta is not None: - empty_output = self.handle_empty_partitions(predict_meta) - if empty_output is not None: - return empty_output - return estimator.predict(part) + if part.shape[0] == 0 and predict_meta is not None: + empty_output = self.handle_empty_partitions(predict_meta) + if empty_output is not None: + return empty_output + return estimator.predict(part) def handle_empty_partitions(self, output_meta): if hasattr(output_meta, "__array_function__"): diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 4032116e1..8e18d7fd7 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -927,64 +927,74 @@ def test_experiment_automl_regressor(c, client, training_df): def test_predict_with_nullable_types(c): - df = pd.DataFrame({ - "rough_day_of_year": [0, 1, 2, 3], - "prev_day_inches_rained": [0, 1, 2, 3], - "next_day_inches_rained": [0, 1, 2, 3], - "rained": [False, False, False, True] - }) + df = pd.DataFrame( + { + "rough_day_of_year": [0, 1, 2, 3], + "prev_day_inches_rained": [0, 1, 2, 3], + "rained": [False, False, False, True], + } + ) c.create_table("train_set", df) model_class = "'sklearn.linear_model.LogisticRegression'" - c.sql(f""" - CREATE OR REPLACE MODEL model WITH ( - model_class = {model_class}, - wrap_predict = True, - wrap_fit = False, - target_column = 'rained' - ) AS ( - SELECT * - FROM train_set + c.sql( + f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """ ) - """) - expected = c.sql(""" - SELECT * FROM PREDICT( - MODEL model, - SELECT * FROM train_set + expected = c.sql( + """ + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """ ) - """) - df = pd.DataFrame({ - "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype='Int32'), - "prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype='float32'), - "rained": pd.Series([False, False, False, True]) - }) + df = pd.DataFrame( + { + "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype='Int32'), + "prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype='float32'), + "rained": pd.Series([False, False, False, True]), + } + ) c.create_table("train_set", df) - c.sql(f""" - CREATE OR REPLACE MODEL model WITH ( - model_class = {model_class}, - wrap_predict = True, - wrap_fit = False, - target_column = 'rained' - ) AS ( - SELECT * - FROM train_set + c.sql( + f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """ ) - """) - result = c.sql(""" - SELECT * FROM PREDICT( - MODEL model, - SELECT * FROM train_set + result = c.sql( + """ + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """ ) - """) assert_eq( expected, result, check_dtype=False, - check_names=False, ) From 58f95f1ea003ac606c1c645ced830408001b7840 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 23 Sep 2022 12:47:34 -0700 Subject: [PATCH 3/8] dask_ml_flag and gpu test --- dask_sql/physical/rel/custom/predict.py | 24 +++++++++++++++--------- tests/integration/test_model.py | 16 ++++++++++------ 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index 818eb8af2..a61436897 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -3,11 +3,11 @@ from typing import TYPE_CHECKING import numpy as np - try: from dask_ml.wrappers import ParallelPostFit + dask_ml_flag = True except ImportError: # pragma: no cover - raise ValueError("Wrapping requires dask-ml to be installed.") + dask_ml_flag = False from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin @@ -68,13 +68,19 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai df = context.sql(sql_select) part = df[training_columns] - if isinstance(model, ParallelPostFit): - output_meta = model.predict_meta - if output_meta is None: - output_meta = model.estimator.predict(part._meta_nonempty) - prediction = part.map_partitions( - self._predict, output_meta, model.estimator, meta=output_meta - ) + if dask_ml_flag: + if isinstance(model, ParallelPostFit): + output_meta = model.predict_meta + if output_meta is None: + output_meta = model.estimator.predict(part._meta_nonempty) + prediction = part.map_partitions( + self._predict, + output_meta, + model.estimator, + meta=output_meta + ) + else: + prediction = model.predict(part) else: prediction = model.predict(part) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 8e18d7fd7..b988be1e8 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -926,7 +926,8 @@ def test_experiment_automl_regressor(c, client, training_df): check_trained_model(c, "my_automl_exp2") -def test_predict_with_nullable_types(c): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_predict_with_nullable_types(c, gpu): df = pd.DataFrame( { "rough_day_of_year": [0, 1, 2, 3], @@ -934,9 +935,12 @@ def test_predict_with_nullable_types(c): "rained": [False, False, False, True], } ) - c.create_table("train_set", df) + c.create_table("train_set", df, gpu=gpu) - model_class = "'sklearn.linear_model.LogisticRegression'" + if gpu: + model_class = "'cuml.linear_model.LogisticRegression'" + else: + model_class = "'sklearn.linear_model.LogisticRegression'" c.sql( f""" @@ -963,12 +967,12 @@ def test_predict_with_nullable_types(c): df = pd.DataFrame( { - "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype='Int32'), - "prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype='float32'), + "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype="Int32"), + "prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype="float32"), "rained": pd.Series([False, False, False, True]), } ) - c.create_table("train_set", df) + c.create_table("train_set", df, gpu=gpu) c.sql( f""" From 74fd5c0edc99ca4348a3837e886dca7903fc26e3 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 23 Sep 2022 13:31:21 -0700 Subject: [PATCH 4/8] sklearn vs cuml --- dask_sql/physical/rel/custom/create_model.py | 5 ++++- dask_sql/physical/rel/custom/predict.py | 7 +++---- tests/integration/test_model.py | 2 ++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index d75c56ba9..c930a3fe6 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -184,7 +184,10 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)] model = delayed_model[0].compute() - output_meta = np.array([]) + if "sklearn" in model_class: + output_meta = np.array([]) + else: + output_meta = None model = ParallelPostFit( estimator=model, predict_meta=output_meta, diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index a61436897..8687bc83b 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -3,8 +3,10 @@ from typing import TYPE_CHECKING import numpy as np + try: from dask_ml.wrappers import ParallelPostFit + dask_ml_flag = True except ImportError: # pragma: no cover dask_ml_flag = False @@ -74,10 +76,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai if output_meta is None: output_meta = model.estimator.predict(part._meta_nonempty) prediction = part.map_partitions( - self._predict, - output_meta, - model.estimator, - meta=output_meta + self._predict, output_meta, model.estimator, meta=output_meta ) else: prediction = model.predict(part) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index b988be1e8..5f733260d 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -926,6 +926,8 @@ def test_experiment_automl_regressor(c, client, training_df): check_trained_model(c, "my_automl_exp2") +# TODO - many ML tests fail on clusters without sklearn - can we avoid this? +@skip_if_external_scheduler @pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) def test_predict_with_nullable_types(c, gpu): df = pd.DataFrame( From 832c608bea22f3c3dfcd588eb9ea0fab372bbc88 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Tue, 27 Sep 2022 09:18:30 -0700 Subject: [PATCH 5/8] Update create_model.py --- dask_sql/physical/rel/custom/create_model.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index c930a3fe6..1a8f89846 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -186,14 +186,14 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model = delayed_model[0].compute() if "sklearn" in model_class: output_meta = np.array([]) + model = ParallelPostFit( + estimator=model, + predict_meta=output_meta, + predict_proba_meta=output_meta, + transform_meta=output_meta, + ) else: - output_meta = None - model = ParallelPostFit( - estimator=model, - predict_meta=output_meta, - predict_proba_meta=output_meta, - transform_meta=output_meta, - ) + model = ParallelPostFit(estimator=model) else: model.fit(X, y, **fit_kwargs) From 37d15a3f4803788c31021ce63f8b052fe6bae128 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 27 Sep 2022 09:38:00 -0700 Subject: [PATCH 6/8] work around cpu/gpu logic --- dask_sql/physical/rel/custom/predict.py | 12 +++++++++--- tests/integration/test_model.py | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index 8687bc83b..dfdf29a25 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -75,9 +75,15 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai output_meta = model.predict_meta if output_meta is None: output_meta = model.estimator.predict(part._meta_nonempty) - prediction = part.map_partitions( - self._predict, output_meta, model.estimator, meta=output_meta - ) + try: + prediction = part.map_partitions( + self._predict, + output_meta, + model.estimator, + meta=output_meta, + ) + except ValueError: + prediction = model.predict(part) else: prediction = model.predict(part) else: diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 5f733260d..cb6bd377f 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -937,7 +937,7 @@ def test_predict_with_nullable_types(c, gpu): "rained": [False, False, False, True], } ) - c.create_table("train_set", df, gpu=gpu) + c.create_table("train_set", df) if gpu: model_class = "'cuml.linear_model.LogisticRegression'" @@ -974,7 +974,7 @@ def test_predict_with_nullable_types(c, gpu): "rained": pd.Series([False, False, False, True]), } ) - c.create_table("train_set", df, gpu=gpu) + c.create_table("train_set", df) c.sql( f""" From 5110e9094f4d0fdb16e8a336d32eb5c9219aa96a Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Tue, 27 Sep 2022 10:43:32 -0700 Subject: [PATCH 7/8] Update test_model.py --- tests/integration/test_model.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index cb6bd377f..eaaa50044 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -932,8 +932,8 @@ def test_experiment_automl_regressor(c, client, training_df): def test_predict_with_nullable_types(c, gpu): df = pd.DataFrame( { - "rough_day_of_year": [0, 1, 2, 3], - "prev_day_inches_rained": [0, 1, 2, 3], + "rough_day_of_year": [0.0, 1.0, 2.0, 3.0], + "prev_day_inches_rained": [0.0, 1.0, 2.0, 3.0], "rained": [False, False, False, True], } ) @@ -969,8 +969,8 @@ def test_predict_with_nullable_types(c, gpu): df = pd.DataFrame( { - "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype="Int32"), - "prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype="float32"), + "rough_day_of_year": pd.Series([0.0, 1.0, 2.0, 3.0], dtype="Float32"), + "prev_day_inches_rained": pd.Series([0.0, 1.0, 2.0, 3.0], dtype="Float32"), "rained": pd.Series([False, False, False, True]), } ) From 929538e7adef6f9d140a71453c95c4fbcd2f3455 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Tue, 27 Sep 2022 11:05:47 -0700 Subject: [PATCH 8/8] Update test_model.py --- tests/integration/test_model.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index eaaa50044..8c97b770c 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -928,21 +928,17 @@ def test_experiment_automl_regressor(c, client, training_df): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @skip_if_external_scheduler -@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) -def test_predict_with_nullable_types(c, gpu): +def test_predict_with_nullable_types(c): df = pd.DataFrame( { - "rough_day_of_year": [0.0, 1.0, 2.0, 3.0], + "rough_day_of_year": [0, 1, 2, 3], "prev_day_inches_rained": [0.0, 1.0, 2.0, 3.0], "rained": [False, False, False, True], } ) c.create_table("train_set", df) - if gpu: - model_class = "'cuml.linear_model.LogisticRegression'" - else: - model_class = "'sklearn.linear_model.LogisticRegression'" + model_class = "'sklearn.linear_model.LogisticRegression'" c.sql( f""" @@ -969,7 +965,7 @@ def test_predict_with_nullable_types(c, gpu): df = pd.DataFrame( { - "rough_day_of_year": pd.Series([0.0, 1.0, 2.0, 3.0], dtype="Float32"), + "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype="Int32"), "prev_day_inches_rained": pd.Series([0.0, 1.0, 2.0, 3.0], dtype="Float32"), "rained": pd.Series([False, False, False, True]), }