Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-16 Thread Dian Fu
Sure. I have replied. Let’s discuss it in that thread.

> 2021年4月16日 上午11:40,Yik San Chan  写道:
> 
> Hi Dian,
> 
> Thank you so much for tracking the issue!
> 
> I run into another NullPointerException when running pandas UDF, but this 
> time I add an unit test to ensure the input and output type already ... And 
> the new issue looks even more odd ... Do you mind taking a look? 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html>
> 
> Thank you!
> 
> Best,
> Yik San
> 
> On Fri, Apr 16, 2021 at 11:05 AM Dian Fu  <mailto:dian0511...@gmail.com>> wrote:
> Definitely agree with you. Have created 
> https://issues.apache.org/jira/browse/FLINK-22297 
> <https://issues.apache.org/jira/browse/FLINK-22297> as a following up.
> 
>> 2021年4月16日 上午7:10,Yik San Chan > <mailto:evan.chanyik...@gmail.com>> 写道:
>> 
>> Hi Dian,
>> 
>> I wonder if we can improve the error tracing and message so that it becomes 
>> more obvious where the problem is? To me, a NPE really says very little.
>> 
>> Best,
>> Yik San
>> 
>> On Thu, Apr 15, 2021 at 11:07 AM Dian Fu > <mailto:dian0511...@gmail.com>> wrote:
>> Great! Thanks for letting me know~
>> 
>>> 2021年4月15日 上午11:01,Yik San Chan >> <mailto:evan.chanyik...@gmail.com>> 写道:
>>> 
>>> Hi Dian,
>>> 
>>> Thanks for the reminder. Yes, the original udf implementation does not 
>>> qualify the input and output type requirement. After adding a unit test, I 
>>> was able to find what's wrong, and fix my UDF implementation. Here is the 
>>> new implementation FYI.
>>> 
>>> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
>>> def predict(users, items):
>>> n_users, n_items = 943, 1682
>>> model = MatrixFactorization(n_users, n_items)
>>> model.load_state_dict(torch.load("model.pth"))
>>> return pd.Series(model(users, items).detach().numpy())
>>> 
>>> And here is the unit test.
>>> 
>>> def test_predict():
>>> f = predict._func
>>> users = pd.Series([1, 2, 3])
>>> items = pd.Series([1, 4, 9])
>>> preds = f(users, items)
>>> assert isinstance(preds, pd.Series)
>>> assert len(preds) == 3
>>> 
>>> Thank you so much!
>>> 
>>> Best,
>>> Yik San
>>> 
>>> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu >> <mailto:dian0511...@gmail.com>> wrote:
>>> Hi Yik San,
>>> 
>>> 1) There are two kinds of Python UDFs in PyFlink: 
>>> - General Python UDFs which process input elements at row basis. That is, 
>>> it will process one row at a time. 
>>> -  Pandas UDFs which process input elements at batch basis.
>>> So you are correct that you need to use Pandas UDF for your requirements.
>>> 
>>> 2) For Pandas UDF, the input type for each input argument is Pandas.Series 
>>> and the result type should also be a Pandas.Series. Besides, the length of 
>>> the result should be the same as the inputs. Could you check if this is the 
>>> case for your Pandas UDF implementation? 
>>> 
>>> Regards,
>>> Dian
>>> 
>>> 
>>> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan >> <mailto:evan.chanyik...@gmail.com>> wrote:
>>> The question is cross-posted on Stack Overflow 
>>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>>>  
>>> <https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception>.
>>> 
>>> I have a ML model that takes two numpy.ndarray - `users` and `items` - and 
>>> returns an numpy.ndarray `predictions`. In normal Python code, I would do:
>>> 
>>> ```python
>>> model = load_model()
>>> 
>>> df = load_data() # the DataFrame includes 4 columns, namely, user_id, 
>>> movie_id, rating, and timestamp
>>> 
>>> users = df.user_id.values
>>> items = df.movie_id.values
>>> 
>>> predictions = model(users, items)
>>> ```
>>> 
>>> I am looking into porting this code into Flink to leverage its distributed 
>>> nature. My assumption is: by distributing the prediction workload on 

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-15 Thread Yik San Chan
Hi Dian,

Thank you so much for tracking the issue!

I run into another NullPointerException when running pandas UDF, but this
time I add an unit test to ensure the input and output type already ... And
the new issue looks even more odd ... Do you mind taking a look?
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html

Thank you!

Best,
Yik San

On Fri, Apr 16, 2021 at 11:05 AM Dian Fu  wrote:

> Definitely agree with you. Have created
> https://issues.apache.org/jira/browse/FLINK-22297 as a following up.
>
> 2021年4月16日 上午7:10,Yik San Chan  写道:
>
> Hi Dian,
>
> I wonder if we can improve the error tracing and message so that it
> becomes more obvious where the problem is? To me, a NPE really says very
> little.
>
> Best,
> Yik San
>
> On Thu, Apr 15, 2021 at 11:07 AM Dian Fu  wrote:
>
>> Great! Thanks for letting me know~
>>
>> 2021年4月15日 上午11:01,Yik San Chan  写道:
>>
>> Hi Dian,
>>
>> Thanks for the reminder. Yes, the original udf implementation does not
>> qualify the input and output type requirement. After adding a unit test, I
>> was able to find what's wrong, and fix my UDF implementation. Here is the
>> new implementation FYI.
>>
>> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
>> def predict(users, items):
>> n_users, n_items = 943, 1682
>> model = MatrixFactorization(n_users, n_items)
>> model.load_state_dict(torch.load("model.pth"))
>> return pd.Series(model(users, items).detach().numpy())
>> And here is the unit test.
>>
>> def test_predict():
>> f = predict._func
>> users = pd.Series([1, 2, 3])
>> items = pd.Series([1, 4, 9])
>> preds = f(users, items)
>> assert isinstance(preds, pd.Series)
>> assert len(preds) == 3
>>
>> Thank you so much!
>>
>> Best,
>> Yik San
>>
>> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu  wrote:
>>
>>> Hi Yik San,
>>>
>>> 1) There are two kinds of Python UDFs in PyFlink:
>>> - General Python UDFs which process input elements at row basis. That
>>> is, it will process one row at a time.
>>> -  Pandas UDFs which process input elements at batch basis.
>>> So you are correct that you need to use Pandas UDF for your requirements.
>>>
>>> 2) For Pandas UDF, the input type for each input argument is
>>> Pandas.Series and the result type should also be a Pandas.Series. Besides,
>>> the length of the result should be the same as the inputs. Could you check
>>> if this is the case for your Pandas UDF implementation?
>>>
>>> Regards,
>>> Dian
>>>
>>>
>>> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan 
>>> wrote:
>>>
>>>> The question is cross-posted on Stack Overflow
>>>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>>>> .
>>>>
>>>> I have a ML model that takes two numpy.ndarray - `users` and `items` -
>>>> and returns an numpy.ndarray `predictions`. In normal Python code, I would
>>>> do:
>>>>
>>>> ```python
>>>> model = load_model()
>>>>
>>>> df = load_data() # the DataFrame includes 4 columns, namely, user_id,
>>>> movie_id, rating, and timestamp
>>>>
>>>> users = df.user_id.values
>>>> items = df.movie_id.values
>>>>
>>>> predictions = model(users, items)
>>>> ```
>>>>
>>>> I am looking into porting this code into Flink to leverage its
>>>> distributed nature. My assumption is: by distributing the prediction
>>>> workload on multiple Flink nodes, I should be able to run the whole
>>>> prediction faster.
>>>>
>>>> So I compose a PyFlink job. Note I implement an UDF called `predict` to
>>>> run the prediction.
>>>>
>>>> ```python
>>>> # batch_prediction.py
>>>>
>>>> model = load_model()
>>>>
>>>> settings =
>>>> EnvironmentSettings.new_instance().use_blink_planner().build()
>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>> t_env = StreamTableEnvironment.create(exec_env,
>>>> environment_settings=settings)
>>>>
>>>> SOURCE_DDL = """
>>>> CREATE TABLE source (
>>>> user_id INT,
>>>> movie_id INT,
>>>> rating TINYINT,
>&g

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-15 Thread Dian Fu
Definitely agree with you. Have created 
https://issues.apache.org/jira/browse/FLINK-22297 
<https://issues.apache.org/jira/browse/FLINK-22297> as a following up.

> 2021年4月16日 上午7:10,Yik San Chan  写道:
> 
> Hi Dian,
> 
> I wonder if we can improve the error tracing and message so that it becomes 
> more obvious where the problem is? To me, a NPE really says very little.
> 
> Best,
> Yik San
> 
> On Thu, Apr 15, 2021 at 11:07 AM Dian Fu  <mailto:dian0511...@gmail.com>> wrote:
> Great! Thanks for letting me know~
> 
>> 2021年4月15日 上午11:01,Yik San Chan > <mailto:evan.chanyik...@gmail.com>> 写道:
>> 
>> Hi Dian,
>> 
>> Thanks for the reminder. Yes, the original udf implementation does not 
>> qualify the input and output type requirement. After adding a unit test, I 
>> was able to find what's wrong, and fix my UDF implementation. Here is the 
>> new implementation FYI.
>> 
>> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
>> def predict(users, items):
>> n_users, n_items = 943, 1682
>> model = MatrixFactorization(n_users, n_items)
>> model.load_state_dict(torch.load("model.pth"))
>> return pd.Series(model(users, items).detach().numpy())
>> 
>> And here is the unit test.
>> 
>> def test_predict():
>> f = predict._func
>> users = pd.Series([1, 2, 3])
>> items = pd.Series([1, 4, 9])
>> preds = f(users, items)
>> assert isinstance(preds, pd.Series)
>> assert len(preds) == 3
>> 
>> Thank you so much!
>> 
>> Best,
>> Yik San
>> 
>> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu > <mailto:dian0511...@gmail.com>> wrote:
>> Hi Yik San,
>> 
>> 1) There are two kinds of Python UDFs in PyFlink: 
>> - General Python UDFs which process input elements at row basis. That is, it 
>> will process one row at a time. 
>> -  Pandas UDFs which process input elements at batch basis.
>> So you are correct that you need to use Pandas UDF for your requirements.
>> 
>> 2) For Pandas UDF, the input type for each input argument is Pandas.Series 
>> and the result type should also be a Pandas.Series. Besides, the length of 
>> the result should be the same as the inputs. Could you check if this is the 
>> case for your Pandas UDF implementation? 
>> 
>> Regards,
>> Dian
>> 
>> 
>> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan > <mailto:evan.chanyik...@gmail.com>> wrote:
>> The question is cross-posted on Stack Overflow 
>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>>  
>> <https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception>.
>> 
>> I have a ML model that takes two numpy.ndarray - `users` and `items` - and 
>> returns an numpy.ndarray `predictions`. In normal Python code, I would do:
>> 
>> ```python
>> model = load_model()
>> 
>> df = load_data() # the DataFrame includes 4 columns, namely, user_id, 
>> movie_id, rating, and timestamp
>> 
>> users = df.user_id.values
>> items = df.movie_id.values
>> 
>> predictions = model(users, items)
>> ```
>> 
>> I am looking into porting this code into Flink to leverage its distributed 
>> nature. My assumption is: by distributing the prediction workload on 
>> multiple Flink nodes, I should be able to run the whole prediction faster.
>> 
>> So I compose a PyFlink job. Note I implement an UDF called `predict` to run 
>> the prediction.
>> 
>> ```python
>> # batch_prediction.py
>> 
>> model = load_model()
>> 
>> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> t_env = StreamTableEnvironment.create(exec_env, 
>> environment_settings=settings)
>> 
>> SOURCE_DDL = """
>> CREATE TABLE source (
>> user_id INT,
>> movie_id INT,
>> rating TINYINT,
>> event_ms BIGINT
>> ) WITH (
>> 'connector' = 'filesystem',
>> 'format' = 'csv',
>> 'csv.field-delimiter' = '\t',
>> 'path' = 'ml-100k/u1.test'
>> )
>> """
>> 
>> SINK_DDL = """
>> CREATE TABLE sink (
>> prediction DOUBLE
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """
>> 
>> t_env.execute_sql(SOURCE_DDL)
>> t_env.execute_sql(SINK_DDL)
>> t_env.execute_sql(

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-15 Thread Yik San Chan
Hi Dian,

I wonder if we can improve the error tracing and message so that it becomes
more obvious where the problem is? To me, a NPE really says very little.

Best,
Yik San

On Thu, Apr 15, 2021 at 11:07 AM Dian Fu  wrote:

> Great! Thanks for letting me know~
>
> 2021年4月15日 上午11:01,Yik San Chan  写道:
>
> Hi Dian,
>
> Thanks for the reminder. Yes, the original udf implementation does not
> qualify the input and output type requirement. After adding a unit test, I
> was able to find what's wrong, and fix my UDF implementation. Here is the
> new implementation FYI.
>
> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
> def predict(users, items):
> n_users, n_items = 943, 1682
> model = MatrixFactorization(n_users, n_items)
> model.load_state_dict(torch.load("model.pth"))
> return pd.Series(model(users, items).detach().numpy())
> And here is the unit test.
>
> def test_predict():
> f = predict._func
> users = pd.Series([1, 2, 3])
> items = pd.Series([1, 4, 9])
> preds = f(users, items)
> assert isinstance(preds, pd.Series)
> assert len(preds) == 3
>
> Thank you so much!
>
> Best,
> Yik San
>
> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu  wrote:
>
>> Hi Yik San,
>>
>> 1) There are two kinds of Python UDFs in PyFlink:
>> - General Python UDFs which process input elements at row basis. That is,
>> it will process one row at a time.
>> -  Pandas UDFs which process input elements at batch basis.
>> So you are correct that you need to use Pandas UDF for your requirements.
>>
>> 2) For Pandas UDF, the input type for each input argument is
>> Pandas.Series and the result type should also be a Pandas.Series. Besides,
>> the length of the result should be the same as the inputs. Could you check
>> if this is the case for your Pandas UDF implementation?
>>
>> Regards,
>> Dian
>>
>>
>> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan 
>> wrote:
>>
>>> The question is cross-posted on Stack Overflow
>>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>>> .
>>>
>>> I have a ML model that takes two numpy.ndarray - `users` and `items` -
>>> and returns an numpy.ndarray `predictions`. In normal Python code, I would
>>> do:
>>>
>>> ```python
>>> model = load_model()
>>>
>>> df = load_data() # the DataFrame includes 4 columns, namely, user_id,
>>> movie_id, rating, and timestamp
>>>
>>> users = df.user_id.values
>>> items = df.movie_id.values
>>>
>>> predictions = model(users, items)
>>> ```
>>>
>>> I am looking into porting this code into Flink to leverage its
>>> distributed nature. My assumption is: by distributing the prediction
>>> workload on multiple Flink nodes, I should be able to run the whole
>>> prediction faster.
>>>
>>> So I compose a PyFlink job. Note I implement an UDF called `predict` to
>>> run the prediction.
>>>
>>> ```python
>>> # batch_prediction.py
>>>
>>> model = load_model()
>>>
>>> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> t_env = StreamTableEnvironment.create(exec_env,
>>> environment_settings=settings)
>>>
>>> SOURCE_DDL = """
>>> CREATE TABLE source (
>>> user_id INT,
>>> movie_id INT,
>>> rating TINYINT,
>>> event_ms BIGINT
>>> ) WITH (
>>> 'connector' = 'filesystem',
>>> 'format' = 'csv',
>>> 'csv.field-delimiter' = '\t',
>>> 'path' = 'ml-100k/u1.test'
>>> )
>>> """
>>>
>>> SINK_DDL = """
>>> CREATE TABLE sink (
>>> prediction DOUBLE
>>> ) WITH (
>>> 'connector' = 'print'
>>> )
>>> """
>>>
>>> t_env.execute_sql(SOURCE_DDL)
>>> t_env.execute_sql(SINK_DDL)
>>> t_env.execute_sql(
>>> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
>>> ).wait()
>>> ```
>>>
>>> Here is the UDF.
>>>
>>> ```python
>>> # batch_prediction.py (cont)
>>>
>>> @udf(result_type=DataTypes.DOUBLE())
>>> def predict(user, item):
>>> return model([user], [item]).item()
>>>
>>> t_env.create_temporary_function("predict&

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Dian Fu
Great! Thanks for letting me know~

> 2021年4月15日 上午11:01,Yik San Chan  写道:
> 
> Hi Dian,
> 
> Thanks for the reminder. Yes, the original udf implementation does not 
> qualify the input and output type requirement. After adding a unit test, I 
> was able to find what's wrong, and fix my UDF implementation. Here is the new 
> implementation FYI.
> 
> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
> def predict(users, items):
> n_users, n_items = 943, 1682
> model = MatrixFactorization(n_users, n_items)
> model.load_state_dict(torch.load("model.pth"))
> return pd.Series(model(users, items).detach().numpy())
> 
> And here is the unit test.
> 
> def test_predict():
> f = predict._func
> users = pd.Series([1, 2, 3])
> items = pd.Series([1, 4, 9])
> preds = f(users, items)
> assert isinstance(preds, pd.Series)
> assert len(preds) == 3
> 
> Thank you so much!
> 
> Best,
> Yik San
> 
> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu  <mailto:dian0511...@gmail.com>> wrote:
> Hi Yik San,
> 
> 1) There are two kinds of Python UDFs in PyFlink: 
> - General Python UDFs which process input elements at row basis. That is, it 
> will process one row at a time. 
> -  Pandas UDFs which process input elements at batch basis.
> So you are correct that you need to use Pandas UDF for your requirements.
> 
> 2) For Pandas UDF, the input type for each input argument is Pandas.Series 
> and the result type should also be a Pandas.Series. Besides, the length of 
> the result should be the same as the inputs. Could you check if this is the 
> case for your Pandas UDF implementation? 
> 
> Regards,
> Dian
> 
> 
> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan  <mailto:evan.chanyik...@gmail.com>> wrote:
> The question is cross-posted on Stack Overflow 
> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>  
> <https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception>.
> 
> I have a ML model that takes two numpy.ndarray - `users` and `items` - and 
> returns an numpy.ndarray `predictions`. In normal Python code, I would do:
> 
> ```python
> model = load_model()
> 
> df = load_data() # the DataFrame includes 4 columns, namely, user_id, 
> movie_id, rating, and timestamp
> 
> users = df.user_id.values
> items = df.movie_id.values
> 
> predictions = model(users, items)
> ```
> 
> I am looking into porting this code into Flink to leverage its distributed 
> nature. My assumption is: by distributing the prediction workload on multiple 
> Flink nodes, I should be able to run the whole prediction faster.
> 
> So I compose a PyFlink job. Note I implement an UDF called `predict` to run 
> the prediction.
> 
> ```python
> # batch_prediction.py
> 
> model = load_model()
> 
> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings)
> 
> SOURCE_DDL = """
> CREATE TABLE source (
> user_id INT,
> movie_id INT,
> rating TINYINT,
> event_ms BIGINT
> ) WITH (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'csv.field-delimiter' = '\t',
> 'path' = 'ml-100k/u1.test'
> )
> """
> 
> SINK_DDL = """
> CREATE TABLE sink (
> prediction DOUBLE
> ) WITH (
> 'connector' = 'print'
> )
> """
> 
> t_env.execute_sql(SOURCE_DDL)
> t_env.execute_sql(SINK_DDL)
> t_env.execute_sql(
> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
> ).wait()
> ```
> 
> Here is the UDF.
> 
> ```python
> # batch_prediction.py (cont)
> 
> @udf(result_type=DataTypes.DOUBLE())
> def predict(user, item):
> return model([user], [item]).item()
> 
> t_env.create_temporary_function("predict", predict)
> ```
> 
> The job runs fine. However, the prediction actually runs on each and every 
> row of the `source` table, which is not performant. Instead, I want to split 
> the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches, with each 
> batch having 800 rows. The job triggers the `model(users, items)` function 
> 100 times (= # of batch), where both `users` and `items` have 800 elements.
> 
> I couldn't find a way to do this. By looking at the 
> [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html
>  
> <https://ci.apache.org/proj

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Yik San Chan
Hi Dian,

Thanks for the reminder. Yes, the original udf implementation does not
qualify the input and output type requirement. After adding a unit test, I
was able to find what's wrong, and fix my UDF implementation. Here is the
new implementation FYI.

@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def predict(users, items):
n_users, n_items = 943, 1682
model = MatrixFactorization(n_users, n_items)
model.load_state_dict(torch.load("model.pth"))
return pd.Series(model(users, items).detach().numpy())
And here is the unit test.

def test_predict():
f = predict._func
users = pd.Series([1, 2, 3])
items = pd.Series([1, 4, 9])
preds = f(users, items)
assert isinstance(preds, pd.Series)
assert len(preds) == 3

Thank you so much!

Best,
Yik San

On Wed, Apr 14, 2021 at 11:03 PM Dian Fu  wrote:

> Hi Yik San,
>
> 1) There are two kinds of Python UDFs in PyFlink:
> - General Python UDFs which process input elements at row basis. That is,
> it will process one row at a time.
> -  Pandas UDFs which process input elements at batch basis.
> So you are correct that you need to use Pandas UDF for your requirements.
>
> 2) For Pandas UDF, the input type for each input argument is Pandas.Series
> and the result type should also be a Pandas.Series. Besides, the length of
> the result should be the same as the inputs. Could you check if this is the
> case for your Pandas UDF implementation?
>
> Regards,
> Dian
>
>
> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan 
> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>> .
>>
>> I have a ML model that takes two numpy.ndarray - `users` and `items` -
>> and returns an numpy.ndarray `predictions`. In normal Python code, I would
>> do:
>>
>> ```python
>> model = load_model()
>>
>> df = load_data() # the DataFrame includes 4 columns, namely, user_id,
>> movie_id, rating, and timestamp
>>
>> users = df.user_id.values
>> items = df.movie_id.values
>>
>> predictions = model(users, items)
>> ```
>>
>> I am looking into porting this code into Flink to leverage its
>> distributed nature. My assumption is: by distributing the prediction
>> workload on multiple Flink nodes, I should be able to run the whole
>> prediction faster.
>>
>> So I compose a PyFlink job. Note I implement an UDF called `predict` to
>> run the prediction.
>>
>> ```python
>> # batch_prediction.py
>>
>> model = load_model()
>>
>> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> t_env = StreamTableEnvironment.create(exec_env,
>> environment_settings=settings)
>>
>> SOURCE_DDL = """
>> CREATE TABLE source (
>> user_id INT,
>> movie_id INT,
>> rating TINYINT,
>> event_ms BIGINT
>> ) WITH (
>> 'connector' = 'filesystem',
>> 'format' = 'csv',
>> 'csv.field-delimiter' = '\t',
>> 'path' = 'ml-100k/u1.test'
>> )
>> """
>>
>> SINK_DDL = """
>> CREATE TABLE sink (
>> prediction DOUBLE
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """
>>
>> t_env.execute_sql(SOURCE_DDL)
>> t_env.execute_sql(SINK_DDL)
>> t_env.execute_sql(
>> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
>> ).wait()
>> ```
>>
>> Here is the UDF.
>>
>> ```python
>> # batch_prediction.py (cont)
>>
>> @udf(result_type=DataTypes.DOUBLE())
>> def predict(user, item):
>> return model([user], [item]).item()
>>
>> t_env.create_temporary_function("predict", predict)
>> ```
>>
>> The job runs fine. However, the prediction actually runs on each and
>> every row of the `source` table, which is not performant. Instead, I want
>> to split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches,
>> with each batch having 800 rows. The job triggers the `model(users, items)`
>> function 100 times (= # of batch), where both `users` and `items` have 800
>> elements.
>>
>> I couldn't find a way to do this. By looking at the [docs](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html),
>> vectorized user-defined functions may work.
>>
>> ```python
>> # batch_prediction.py (snippet)
>>
>> # I add the func_type="p

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Dian Fu
Hi Yik San,

1) There are two kinds of Python UDFs in PyFlink:
- General Python UDFs which process input elements at row basis. That is,
it will process one row at a time.
-  Pandas UDFs which process input elements at batch basis.
So you are correct that you need to use Pandas UDF for your requirements.

2) For Pandas UDF, the input type for each input argument is Pandas.Series
and the result type should also be a Pandas.Series. Besides, the length of
the result should be the same as the inputs. Could you check if this is the
case for your Pandas UDF implementation?

Regards,
Dian


On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan 
wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
> .
>
> I have a ML model that takes two numpy.ndarray - `users` and `items` - and
> returns an numpy.ndarray `predictions`. In normal Python code, I would do:
>
> ```python
> model = load_model()
>
> df = load_data() # the DataFrame includes 4 columns, namely, user_id,
> movie_id, rating, and timestamp
>
> users = df.user_id.values
> items = df.movie_id.values
>
> predictions = model(users, items)
> ```
>
> I am looking into porting this code into Flink to leverage its distributed
> nature. My assumption is: by distributing the prediction workload on
> multiple Flink nodes, I should be able to run the whole prediction faster.
>
> So I compose a PyFlink job. Note I implement an UDF called `predict` to
> run the prediction.
>
> ```python
> # batch_prediction.py
>
> model = load_model()
>
> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(exec_env,
> environment_settings=settings)
>
> SOURCE_DDL = """
> CREATE TABLE source (
> user_id INT,
> movie_id INT,
> rating TINYINT,
> event_ms BIGINT
> ) WITH (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'csv.field-delimiter' = '\t',
> 'path' = 'ml-100k/u1.test'
> )
> """
>
> SINK_DDL = """
> CREATE TABLE sink (
> prediction DOUBLE
> ) WITH (
> 'connector' = 'print'
> )
> """
>
> t_env.execute_sql(SOURCE_DDL)
> t_env.execute_sql(SINK_DDL)
> t_env.execute_sql(
> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
> ).wait()
> ```
>
> Here is the UDF.
>
> ```python
> # batch_prediction.py (cont)
>
> @udf(result_type=DataTypes.DOUBLE())
> def predict(user, item):
> return model([user], [item]).item()
>
> t_env.create_temporary_function("predict", predict)
> ```
>
> The job runs fine. However, the prediction actually runs on each and every
> row of the `source` table, which is not performant. Instead, I want to
> split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches,
> with each batch having 800 rows. The job triggers the `model(users, items)`
> function 100 times (= # of batch), where both `users` and `items` have 800
> elements.
>
> I couldn't find a way to do this. By looking at the [docs](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html),
> vectorized user-defined functions may work.
>
> ```python
> # batch_prediction.py (snippet)
>
> # I add the func_type="pandas"
> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
> def predict(user, item):
> ...
> ```
>
> Unfortunately, it doesn't.
>
> ```
> > python batch_prediction.py
> ...
> Traceback (most recent call last):
>   File "batch_prediction.py", line 55, in 
> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
>   File
> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/table/table_result.py",
> line 76, in wait
> get_method(self._j_table_result, "await")()
>   File
> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
> return f(*a, **kw)
>   File
> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o51.await.
> : java.util.concurrent.Exec

PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Yik San Chan
The question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
.

I have a ML model that takes two numpy.ndarray - `users` and `items` - and
returns an numpy.ndarray `predictions`. In normal Python code, I would do:

```python
model = load_model()

df = load_data() # the DataFrame includes 4 columns, namely, user_id,
movie_id, rating, and timestamp

users = df.user_id.values
items = df.movie_id.values

predictions = model(users, items)
```

I am looking into porting this code into Flink to leverage its distributed
nature. My assumption is: by distributing the prediction workload on
multiple Flink nodes, I should be able to run the whole prediction faster.

So I compose a PyFlink job. Note I implement an UDF called `predict` to run
the prediction.

```python
# batch_prediction.py

model = load_model()

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=settings)

SOURCE_DDL = """
CREATE TABLE source (
user_id INT,
movie_id INT,
rating TINYINT,
event_ms BIGINT
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'csv.field-delimiter' = '\t',
'path' = 'ml-100k/u1.test'
)
"""

SINK_DDL = """
CREATE TABLE sink (
prediction DOUBLE
) WITH (
'connector' = 'print'
)
"""

t_env.execute_sql(SOURCE_DDL)
t_env.execute_sql(SINK_DDL)
t_env.execute_sql(
"INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
).wait()
```

Here is the UDF.

```python
# batch_prediction.py (cont)

@udf(result_type=DataTypes.DOUBLE())
def predict(user, item):
return model([user], [item]).item()

t_env.create_temporary_function("predict", predict)
```

The job runs fine. However, the prediction actually runs on each and every
row of the `source` table, which is not performant. Instead, I want to
split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches,
with each batch having 800 rows. The job triggers the `model(users, items)`
function 100 times (= # of batch), where both `users` and `items` have 800
elements.

I couldn't find a way to do this. By looking at the [docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html),
vectorized user-defined functions may work.

```python
# batch_prediction.py (snippet)

# I add the func_type="pandas"
@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def predict(user, item):
...
```

Unfortunately, it doesn't.

```
> python batch_prediction.py
...
Traceback (most recent call last):
  File "batch_prediction.py", line 55, in 
"INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
  File
"/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/table/table_result.py",
line 76, in wait
get_method(self._j_table_result, "await")()
  File
"/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
  File
"/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o51.await.
: java.util.concurrent.ExecutionException:
org.apache.flink.table.api.TableException: Failed to wait job finish
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119)
at
org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python