Re: PyFlink Vectorized UDF throws NullPointerException
Sure. I have replied. Let’s discuss it in that thread. > 2021年4月16日 上午11:40,Yik San Chan 写道: > > Hi Dian, > > Thank you so much for tracking the issue! > > I run into another NullPointerException when running pandas UDF, but this > time I add an unit test to ensure the input and output type already ... And > the new issue looks even more odd ... Do you mind taking a look? > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html> > > Thank you! > > Best, > Yik San > > On Fri, Apr 16, 2021 at 11:05 AM Dian Fu <mailto:dian0511...@gmail.com>> wrote: > Definitely agree with you. Have created > https://issues.apache.org/jira/browse/FLINK-22297 > <https://issues.apache.org/jira/browse/FLINK-22297> as a following up. > >> 2021年4月16日 上午7:10,Yik San Chan > <mailto:evan.chanyik...@gmail.com>> 写道: >> >> Hi Dian, >> >> I wonder if we can improve the error tracing and message so that it becomes >> more obvious where the problem is? To me, a NPE really says very little. >> >> Best, >> Yik San >> >> On Thu, Apr 15, 2021 at 11:07 AM Dian Fu > <mailto:dian0511...@gmail.com>> wrote: >> Great! Thanks for letting me know~ >> >>> 2021年4月15日 上午11:01,Yik San Chan >> <mailto:evan.chanyik...@gmail.com>> 写道: >>> >>> Hi Dian, >>> >>> Thanks for the reminder. Yes, the original udf implementation does not >>> qualify the input and output type requirement. After adding a unit test, I >>> was able to find what's wrong, and fix my UDF implementation. Here is the >>> new implementation FYI. >>> >>> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas") >>> def predict(users, items): >>> n_users, n_items = 943, 1682 >>> model = MatrixFactorization(n_users, n_items) >>> model.load_state_dict(torch.load("model.pth")) >>> return pd.Series(model(users, items).detach().numpy()) >>> >>> And here is the unit test. >>> >>> def test_predict(): >>> f = predict._func >>> users = pd.Series([1, 2, 3]) >>> items = pd.Series([1, 4, 9]) >>> preds = f(users, items) >>> assert isinstance(preds, pd.Series) >>> assert len(preds) == 3 >>> >>> Thank you so much! >>> >>> Best, >>> Yik San >>> >>> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu >> <mailto:dian0511...@gmail.com>> wrote: >>> Hi Yik San, >>> >>> 1) There are two kinds of Python UDFs in PyFlink: >>> - General Python UDFs which process input elements at row basis. That is, >>> it will process one row at a time. >>> - Pandas UDFs which process input elements at batch basis. >>> So you are correct that you need to use Pandas UDF for your requirements. >>> >>> 2) For Pandas UDF, the input type for each input argument is Pandas.Series >>> and the result type should also be a Pandas.Series. Besides, the length of >>> the result should be the same as the inputs. Could you check if this is the >>> case for your Pandas UDF implementation? >>> >>> Regards, >>> Dian >>> >>> >>> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan >> <mailto:evan.chanyik...@gmail.com>> wrote: >>> The question is cross-posted on Stack Overflow >>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception >>> >>> <https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception>. >>> >>> I have a ML model that takes two numpy.ndarray - `users` and `items` - and >>> returns an numpy.ndarray `predictions`. In normal Python code, I would do: >>> >>> ```python >>> model = load_model() >>> >>> df = load_data() # the DataFrame includes 4 columns, namely, user_id, >>> movie_id, rating, and timestamp >>> >>> users = df.user_id.values >>> items = df.movie_id.values >>> >>> predictions = model(users, items) >>> ``` >>> >>> I am looking into porting this code into Flink to leverage its distributed >>> nature. My assumption is: by distributing the prediction workload on
Re: PyFlink Vectorized UDF throws NullPointerException
Hi Dian, Thank you so much for tracking the issue! I run into another NullPointerException when running pandas UDF, but this time I add an unit test to ensure the input and output type already ... And the new issue looks even more odd ... Do you mind taking a look? http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html Thank you! Best, Yik San On Fri, Apr 16, 2021 at 11:05 AM Dian Fu wrote: > Definitely agree with you. Have created > https://issues.apache.org/jira/browse/FLINK-22297 as a following up. > > 2021年4月16日 上午7:10,Yik San Chan 写道: > > Hi Dian, > > I wonder if we can improve the error tracing and message so that it > becomes more obvious where the problem is? To me, a NPE really says very > little. > > Best, > Yik San > > On Thu, Apr 15, 2021 at 11:07 AM Dian Fu wrote: > >> Great! Thanks for letting me know~ >> >> 2021年4月15日 上午11:01,Yik San Chan 写道: >> >> Hi Dian, >> >> Thanks for the reminder. Yes, the original udf implementation does not >> qualify the input and output type requirement. After adding a unit test, I >> was able to find what's wrong, and fix my UDF implementation. Here is the >> new implementation FYI. >> >> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas") >> def predict(users, items): >> n_users, n_items = 943, 1682 >> model = MatrixFactorization(n_users, n_items) >> model.load_state_dict(torch.load("model.pth")) >> return pd.Series(model(users, items).detach().numpy()) >> And here is the unit test. >> >> def test_predict(): >> f = predict._func >> users = pd.Series([1, 2, 3]) >> items = pd.Series([1, 4, 9]) >> preds = f(users, items) >> assert isinstance(preds, pd.Series) >> assert len(preds) == 3 >> >> Thank you so much! >> >> Best, >> Yik San >> >> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu wrote: >> >>> Hi Yik San, >>> >>> 1) There are two kinds of Python UDFs in PyFlink: >>> - General Python UDFs which process input elements at row basis. That >>> is, it will process one row at a time. >>> - Pandas UDFs which process input elements at batch basis. >>> So you are correct that you need to use Pandas UDF for your requirements. >>> >>> 2) For Pandas UDF, the input type for each input argument is >>> Pandas.Series and the result type should also be a Pandas.Series. Besides, >>> the length of the result should be the same as the inputs. Could you check >>> if this is the case for your Pandas UDF implementation? >>> >>> Regards, >>> Dian >>> >>> >>> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan >>> wrote: >>> >>>> The question is cross-posted on Stack Overflow >>>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception >>>> . >>>> >>>> I have a ML model that takes two numpy.ndarray - `users` and `items` - >>>> and returns an numpy.ndarray `predictions`. In normal Python code, I would >>>> do: >>>> >>>> ```python >>>> model = load_model() >>>> >>>> df = load_data() # the DataFrame includes 4 columns, namely, user_id, >>>> movie_id, rating, and timestamp >>>> >>>> users = df.user_id.values >>>> items = df.movie_id.values >>>> >>>> predictions = model(users, items) >>>> ``` >>>> >>>> I am looking into porting this code into Flink to leverage its >>>> distributed nature. My assumption is: by distributing the prediction >>>> workload on multiple Flink nodes, I should be able to run the whole >>>> prediction faster. >>>> >>>> So I compose a PyFlink job. Note I implement an UDF called `predict` to >>>> run the prediction. >>>> >>>> ```python >>>> # batch_prediction.py >>>> >>>> model = load_model() >>>> >>>> settings = >>>> EnvironmentSettings.new_instance().use_blink_planner().build() >>>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>>> t_env = StreamTableEnvironment.create(exec_env, >>>> environment_settings=settings) >>>> >>>> SOURCE_DDL = """ >>>> CREATE TABLE source ( >>>> user_id INT, >>>> movie_id INT, >>>> rating TINYINT, >&g
Re: PyFlink Vectorized UDF throws NullPointerException
Definitely agree with you. Have created https://issues.apache.org/jira/browse/FLINK-22297 <https://issues.apache.org/jira/browse/FLINK-22297> as a following up. > 2021年4月16日 上午7:10,Yik San Chan 写道: > > Hi Dian, > > I wonder if we can improve the error tracing and message so that it becomes > more obvious where the problem is? To me, a NPE really says very little. > > Best, > Yik San > > On Thu, Apr 15, 2021 at 11:07 AM Dian Fu <mailto:dian0511...@gmail.com>> wrote: > Great! Thanks for letting me know~ > >> 2021年4月15日 上午11:01,Yik San Chan > <mailto:evan.chanyik...@gmail.com>> 写道: >> >> Hi Dian, >> >> Thanks for the reminder. Yes, the original udf implementation does not >> qualify the input and output type requirement. After adding a unit test, I >> was able to find what's wrong, and fix my UDF implementation. Here is the >> new implementation FYI. >> >> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas") >> def predict(users, items): >> n_users, n_items = 943, 1682 >> model = MatrixFactorization(n_users, n_items) >> model.load_state_dict(torch.load("model.pth")) >> return pd.Series(model(users, items).detach().numpy()) >> >> And here is the unit test. >> >> def test_predict(): >> f = predict._func >> users = pd.Series([1, 2, 3]) >> items = pd.Series([1, 4, 9]) >> preds = f(users, items) >> assert isinstance(preds, pd.Series) >> assert len(preds) == 3 >> >> Thank you so much! >> >> Best, >> Yik San >> >> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu > <mailto:dian0511...@gmail.com>> wrote: >> Hi Yik San, >> >> 1) There are two kinds of Python UDFs in PyFlink: >> - General Python UDFs which process input elements at row basis. That is, it >> will process one row at a time. >> - Pandas UDFs which process input elements at batch basis. >> So you are correct that you need to use Pandas UDF for your requirements. >> >> 2) For Pandas UDF, the input type for each input argument is Pandas.Series >> and the result type should also be a Pandas.Series. Besides, the length of >> the result should be the same as the inputs. Could you check if this is the >> case for your Pandas UDF implementation? >> >> Regards, >> Dian >> >> >> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan > <mailto:evan.chanyik...@gmail.com>> wrote: >> The question is cross-posted on Stack Overflow >> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception >> >> <https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception>. >> >> I have a ML model that takes two numpy.ndarray - `users` and `items` - and >> returns an numpy.ndarray `predictions`. In normal Python code, I would do: >> >> ```python >> model = load_model() >> >> df = load_data() # the DataFrame includes 4 columns, namely, user_id, >> movie_id, rating, and timestamp >> >> users = df.user_id.values >> items = df.movie_id.values >> >> predictions = model(users, items) >> ``` >> >> I am looking into porting this code into Flink to leverage its distributed >> nature. My assumption is: by distributing the prediction workload on >> multiple Flink nodes, I should be able to run the whole prediction faster. >> >> So I compose a PyFlink job. Note I implement an UDF called `predict` to run >> the prediction. >> >> ```python >> # batch_prediction.py >> >> model = load_model() >> >> settings = EnvironmentSettings.new_instance().use_blink_planner().build() >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> t_env = StreamTableEnvironment.create(exec_env, >> environment_settings=settings) >> >> SOURCE_DDL = """ >> CREATE TABLE source ( >> user_id INT, >> movie_id INT, >> rating TINYINT, >> event_ms BIGINT >> ) WITH ( >> 'connector' = 'filesystem', >> 'format' = 'csv', >> 'csv.field-delimiter' = '\t', >> 'path' = 'ml-100k/u1.test' >> ) >> """ >> >> SINK_DDL = """ >> CREATE TABLE sink ( >> prediction DOUBLE >> ) WITH ( >> 'connector' = 'print' >> ) >> """ >> >> t_env.execute_sql(SOURCE_DDL) >> t_env.execute_sql(SINK_DDL) >> t_env.execute_sql(
Re: PyFlink Vectorized UDF throws NullPointerException
Hi Dian, I wonder if we can improve the error tracing and message so that it becomes more obvious where the problem is? To me, a NPE really says very little. Best, Yik San On Thu, Apr 15, 2021 at 11:07 AM Dian Fu wrote: > Great! Thanks for letting me know~ > > 2021年4月15日 上午11:01,Yik San Chan 写道: > > Hi Dian, > > Thanks for the reminder. Yes, the original udf implementation does not > qualify the input and output type requirement. After adding a unit test, I > was able to find what's wrong, and fix my UDF implementation. Here is the > new implementation FYI. > > @udf(result_type=DataTypes.DOUBLE(), func_type="pandas") > def predict(users, items): > n_users, n_items = 943, 1682 > model = MatrixFactorization(n_users, n_items) > model.load_state_dict(torch.load("model.pth")) > return pd.Series(model(users, items).detach().numpy()) > And here is the unit test. > > def test_predict(): > f = predict._func > users = pd.Series([1, 2, 3]) > items = pd.Series([1, 4, 9]) > preds = f(users, items) > assert isinstance(preds, pd.Series) > assert len(preds) == 3 > > Thank you so much! > > Best, > Yik San > > On Wed, Apr 14, 2021 at 11:03 PM Dian Fu wrote: > >> Hi Yik San, >> >> 1) There are two kinds of Python UDFs in PyFlink: >> - General Python UDFs which process input elements at row basis. That is, >> it will process one row at a time. >> - Pandas UDFs which process input elements at batch basis. >> So you are correct that you need to use Pandas UDF for your requirements. >> >> 2) For Pandas UDF, the input type for each input argument is >> Pandas.Series and the result type should also be a Pandas.Series. Besides, >> the length of the result should be the same as the inputs. Could you check >> if this is the case for your Pandas UDF implementation? >> >> Regards, >> Dian >> >> >> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan >> wrote: >> >>> The question is cross-posted on Stack Overflow >>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception >>> . >>> >>> I have a ML model that takes two numpy.ndarray - `users` and `items` - >>> and returns an numpy.ndarray `predictions`. In normal Python code, I would >>> do: >>> >>> ```python >>> model = load_model() >>> >>> df = load_data() # the DataFrame includes 4 columns, namely, user_id, >>> movie_id, rating, and timestamp >>> >>> users = df.user_id.values >>> items = df.movie_id.values >>> >>> predictions = model(users, items) >>> ``` >>> >>> I am looking into porting this code into Flink to leverage its >>> distributed nature. My assumption is: by distributing the prediction >>> workload on multiple Flink nodes, I should be able to run the whole >>> prediction faster. >>> >>> So I compose a PyFlink job. Note I implement an UDF called `predict` to >>> run the prediction. >>> >>> ```python >>> # batch_prediction.py >>> >>> model = load_model() >>> >>> settings = EnvironmentSettings.new_instance().use_blink_planner().build() >>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>> t_env = StreamTableEnvironment.create(exec_env, >>> environment_settings=settings) >>> >>> SOURCE_DDL = """ >>> CREATE TABLE source ( >>> user_id INT, >>> movie_id INT, >>> rating TINYINT, >>> event_ms BIGINT >>> ) WITH ( >>> 'connector' = 'filesystem', >>> 'format' = 'csv', >>> 'csv.field-delimiter' = '\t', >>> 'path' = 'ml-100k/u1.test' >>> ) >>> """ >>> >>> SINK_DDL = """ >>> CREATE TABLE sink ( >>> prediction DOUBLE >>> ) WITH ( >>> 'connector' = 'print' >>> ) >>> """ >>> >>> t_env.execute_sql(SOURCE_DDL) >>> t_env.execute_sql(SINK_DDL) >>> t_env.execute_sql( >>> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source" >>> ).wait() >>> ``` >>> >>> Here is the UDF. >>> >>> ```python >>> # batch_prediction.py (cont) >>> >>> @udf(result_type=DataTypes.DOUBLE()) >>> def predict(user, item): >>> return model([user], [item]).item() >>> >>> t_env.create_temporary_function("predict&
Re: PyFlink Vectorized UDF throws NullPointerException
Great! Thanks for letting me know~ > 2021年4月15日 上午11:01,Yik San Chan 写道: > > Hi Dian, > > Thanks for the reminder. Yes, the original udf implementation does not > qualify the input and output type requirement. After adding a unit test, I > was able to find what's wrong, and fix my UDF implementation. Here is the new > implementation FYI. > > @udf(result_type=DataTypes.DOUBLE(), func_type="pandas") > def predict(users, items): > n_users, n_items = 943, 1682 > model = MatrixFactorization(n_users, n_items) > model.load_state_dict(torch.load("model.pth")) > return pd.Series(model(users, items).detach().numpy()) > > And here is the unit test. > > def test_predict(): > f = predict._func > users = pd.Series([1, 2, 3]) > items = pd.Series([1, 4, 9]) > preds = f(users, items) > assert isinstance(preds, pd.Series) > assert len(preds) == 3 > > Thank you so much! > > Best, > Yik San > > On Wed, Apr 14, 2021 at 11:03 PM Dian Fu <mailto:dian0511...@gmail.com>> wrote: > Hi Yik San, > > 1) There are two kinds of Python UDFs in PyFlink: > - General Python UDFs which process input elements at row basis. That is, it > will process one row at a time. > - Pandas UDFs which process input elements at batch basis. > So you are correct that you need to use Pandas UDF for your requirements. > > 2) For Pandas UDF, the input type for each input argument is Pandas.Series > and the result type should also be a Pandas.Series. Besides, the length of > the result should be the same as the inputs. Could you check if this is the > case for your Pandas UDF implementation? > > Regards, > Dian > > > On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan <mailto:evan.chanyik...@gmail.com>> wrote: > The question is cross-posted on Stack Overflow > https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception > > <https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception>. > > I have a ML model that takes two numpy.ndarray - `users` and `items` - and > returns an numpy.ndarray `predictions`. In normal Python code, I would do: > > ```python > model = load_model() > > df = load_data() # the DataFrame includes 4 columns, namely, user_id, > movie_id, rating, and timestamp > > users = df.user_id.values > items = df.movie_id.values > > predictions = model(users, items) > ``` > > I am looking into porting this code into Flink to leverage its distributed > nature. My assumption is: by distributing the prediction workload on multiple > Flink nodes, I should be able to run the whole prediction faster. > > So I compose a PyFlink job. Note I implement an UDF called `predict` to run > the prediction. > > ```python > # batch_prediction.py > > model = load_model() > > settings = EnvironmentSettings.new_instance().use_blink_planner().build() > exec_env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings) > > SOURCE_DDL = """ > CREATE TABLE source ( > user_id INT, > movie_id INT, > rating TINYINT, > event_ms BIGINT > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'csv.field-delimiter' = '\t', > 'path' = 'ml-100k/u1.test' > ) > """ > > SINK_DDL = """ > CREATE TABLE sink ( > prediction DOUBLE > ) WITH ( > 'connector' = 'print' > ) > """ > > t_env.execute_sql(SOURCE_DDL) > t_env.execute_sql(SINK_DDL) > t_env.execute_sql( > "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source" > ).wait() > ``` > > Here is the UDF. > > ```python > # batch_prediction.py (cont) > > @udf(result_type=DataTypes.DOUBLE()) > def predict(user, item): > return model([user], [item]).item() > > t_env.create_temporary_function("predict", predict) > ``` > > The job runs fine. However, the prediction actually runs on each and every > row of the `source` table, which is not performant. Instead, I want to split > the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches, with each > batch having 800 rows. The job triggers the `model(users, items)` function > 100 times (= # of batch), where both `users` and `items` have 800 elements. > > I couldn't find a way to do this. By looking at the > [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html > > <https://ci.apache.org/proj
Re: PyFlink Vectorized UDF throws NullPointerException
Hi Dian, Thanks for the reminder. Yes, the original udf implementation does not qualify the input and output type requirement. After adding a unit test, I was able to find what's wrong, and fix my UDF implementation. Here is the new implementation FYI. @udf(result_type=DataTypes.DOUBLE(), func_type="pandas") def predict(users, items): n_users, n_items = 943, 1682 model = MatrixFactorization(n_users, n_items) model.load_state_dict(torch.load("model.pth")) return pd.Series(model(users, items).detach().numpy()) And here is the unit test. def test_predict(): f = predict._func users = pd.Series([1, 2, 3]) items = pd.Series([1, 4, 9]) preds = f(users, items) assert isinstance(preds, pd.Series) assert len(preds) == 3 Thank you so much! Best, Yik San On Wed, Apr 14, 2021 at 11:03 PM Dian Fu wrote: > Hi Yik San, > > 1) There are two kinds of Python UDFs in PyFlink: > - General Python UDFs which process input elements at row basis. That is, > it will process one row at a time. > - Pandas UDFs which process input elements at batch basis. > So you are correct that you need to use Pandas UDF for your requirements. > > 2) For Pandas UDF, the input type for each input argument is Pandas.Series > and the result type should also be a Pandas.Series. Besides, the length of > the result should be the same as the inputs. Could you check if this is the > case for your Pandas UDF implementation? > > Regards, > Dian > > > On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan > wrote: > >> The question is cross-posted on Stack Overflow >> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception >> . >> >> I have a ML model that takes two numpy.ndarray - `users` and `items` - >> and returns an numpy.ndarray `predictions`. In normal Python code, I would >> do: >> >> ```python >> model = load_model() >> >> df = load_data() # the DataFrame includes 4 columns, namely, user_id, >> movie_id, rating, and timestamp >> >> users = df.user_id.values >> items = df.movie_id.values >> >> predictions = model(users, items) >> ``` >> >> I am looking into porting this code into Flink to leverage its >> distributed nature. My assumption is: by distributing the prediction >> workload on multiple Flink nodes, I should be able to run the whole >> prediction faster. >> >> So I compose a PyFlink job. Note I implement an UDF called `predict` to >> run the prediction. >> >> ```python >> # batch_prediction.py >> >> model = load_model() >> >> settings = EnvironmentSettings.new_instance().use_blink_planner().build() >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> t_env = StreamTableEnvironment.create(exec_env, >> environment_settings=settings) >> >> SOURCE_DDL = """ >> CREATE TABLE source ( >> user_id INT, >> movie_id INT, >> rating TINYINT, >> event_ms BIGINT >> ) WITH ( >> 'connector' = 'filesystem', >> 'format' = 'csv', >> 'csv.field-delimiter' = '\t', >> 'path' = 'ml-100k/u1.test' >> ) >> """ >> >> SINK_DDL = """ >> CREATE TABLE sink ( >> prediction DOUBLE >> ) WITH ( >> 'connector' = 'print' >> ) >> """ >> >> t_env.execute_sql(SOURCE_DDL) >> t_env.execute_sql(SINK_DDL) >> t_env.execute_sql( >> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source" >> ).wait() >> ``` >> >> Here is the UDF. >> >> ```python >> # batch_prediction.py (cont) >> >> @udf(result_type=DataTypes.DOUBLE()) >> def predict(user, item): >> return model([user], [item]).item() >> >> t_env.create_temporary_function("predict", predict) >> ``` >> >> The job runs fine. However, the prediction actually runs on each and >> every row of the `source` table, which is not performant. Instead, I want >> to split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches, >> with each batch having 800 rows. The job triggers the `model(users, items)` >> function 100 times (= # of batch), where both `users` and `items` have 800 >> elements. >> >> I couldn't find a way to do this. By looking at the [docs]( >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), >> vectorized user-defined functions may work. >> >> ```python >> # batch_prediction.py (snippet) >> >> # I add the func_type="p
Re: PyFlink Vectorized UDF throws NullPointerException
Hi Yik San, 1) There are two kinds of Python UDFs in PyFlink: - General Python UDFs which process input elements at row basis. That is, it will process one row at a time. - Pandas UDFs which process input elements at batch basis. So you are correct that you need to use Pandas UDF for your requirements. 2) For Pandas UDF, the input type for each input argument is Pandas.Series and the result type should also be a Pandas.Series. Besides, the length of the result should be the same as the inputs. Could you check if this is the case for your Pandas UDF implementation? Regards, Dian On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan wrote: > The question is cross-posted on Stack Overflow > https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception > . > > I have a ML model that takes two numpy.ndarray - `users` and `items` - and > returns an numpy.ndarray `predictions`. In normal Python code, I would do: > > ```python > model = load_model() > > df = load_data() # the DataFrame includes 4 columns, namely, user_id, > movie_id, rating, and timestamp > > users = df.user_id.values > items = df.movie_id.values > > predictions = model(users, items) > ``` > > I am looking into porting this code into Flink to leverage its distributed > nature. My assumption is: by distributing the prediction workload on > multiple Flink nodes, I should be able to run the whole prediction faster. > > So I compose a PyFlink job. Note I implement an UDF called `predict` to > run the prediction. > > ```python > # batch_prediction.py > > model = load_model() > > settings = EnvironmentSettings.new_instance().use_blink_planner().build() > exec_env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(exec_env, > environment_settings=settings) > > SOURCE_DDL = """ > CREATE TABLE source ( > user_id INT, > movie_id INT, > rating TINYINT, > event_ms BIGINT > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'csv.field-delimiter' = '\t', > 'path' = 'ml-100k/u1.test' > ) > """ > > SINK_DDL = """ > CREATE TABLE sink ( > prediction DOUBLE > ) WITH ( > 'connector' = 'print' > ) > """ > > t_env.execute_sql(SOURCE_DDL) > t_env.execute_sql(SINK_DDL) > t_env.execute_sql( > "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source" > ).wait() > ``` > > Here is the UDF. > > ```python > # batch_prediction.py (cont) > > @udf(result_type=DataTypes.DOUBLE()) > def predict(user, item): > return model([user], [item]).item() > > t_env.create_temporary_function("predict", predict) > ``` > > The job runs fine. However, the prediction actually runs on each and every > row of the `source` table, which is not performant. Instead, I want to > split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches, > with each batch having 800 rows. The job triggers the `model(users, items)` > function 100 times (= # of batch), where both `users` and `items` have 800 > elements. > > I couldn't find a way to do this. By looking at the [docs]( > https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), > vectorized user-defined functions may work. > > ```python > # batch_prediction.py (snippet) > > # I add the func_type="pandas" > @udf(result_type=DataTypes.DOUBLE(), func_type="pandas") > def predict(user, item): > ... > ``` > > Unfortunately, it doesn't. > > ``` > > python batch_prediction.py > ... > Traceback (most recent call last): > File "batch_prediction.py", line 55, in > "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source" > File > "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/table/table_result.py", > line 76, in wait > get_method(self._j_table_result, "await")() > File > "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 147, in deco > return f(*a, **kw) > File > "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o51.await. > : java.util.concurrent.Exec
PyFlink Vectorized UDF throws NullPointerException
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception . I have a ML model that takes two numpy.ndarray - `users` and `items` - and returns an numpy.ndarray `predictions`. In normal Python code, I would do: ```python model = load_model() df = load_data() # the DataFrame includes 4 columns, namely, user_id, movie_id, rating, and timestamp users = df.user_id.values items = df.movie_id.values predictions = model(users, items) ``` I am looking into porting this code into Flink to leverage its distributed nature. My assumption is: by distributing the prediction workload on multiple Flink nodes, I should be able to run the whole prediction faster. So I compose a PyFlink job. Note I implement an UDF called `predict` to run the prediction. ```python # batch_prediction.py model = load_model() settings = EnvironmentSettings.new_instance().use_blink_planner().build() exec_env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings) SOURCE_DDL = """ CREATE TABLE source ( user_id INT, movie_id INT, rating TINYINT, event_ms BIGINT ) WITH ( 'connector' = 'filesystem', 'format' = 'csv', 'csv.field-delimiter' = '\t', 'path' = 'ml-100k/u1.test' ) """ SINK_DDL = """ CREATE TABLE sink ( prediction DOUBLE ) WITH ( 'connector' = 'print' ) """ t_env.execute_sql(SOURCE_DDL) t_env.execute_sql(SINK_DDL) t_env.execute_sql( "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source" ).wait() ``` Here is the UDF. ```python # batch_prediction.py (cont) @udf(result_type=DataTypes.DOUBLE()) def predict(user, item): return model([user], [item]).item() t_env.create_temporary_function("predict", predict) ``` The job runs fine. However, the prediction actually runs on each and every row of the `source` table, which is not performant. Instead, I want to split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches, with each batch having 800 rows. The job triggers the `model(users, items)` function 100 times (= # of batch), where both `users` and `items` have 800 elements. I couldn't find a way to do this. By looking at the [docs]( https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized user-defined functions may work. ```python # batch_prediction.py (snippet) # I add the func_type="pandas" @udf(result_type=DataTypes.DOUBLE(), func_type="pandas") def predict(user, item): ... ``` Unfortunately, it doesn't. ``` > python batch_prediction.py ... Traceback (most recent call last): File "batch_prediction.py", line 55, in "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source" File "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait get_method(self._j_table_result, "await")() File "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, **kw) File "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o51.await. : java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119) at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python