Hi Tom,
The following statement is incorrect.
```
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;
```
You should define it as following:
custom_udf_2.py
```
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes
class MyUDF(ScalarFunction):
def __init__(self):
self.counter = None
def open(self, function_context):
self.counter = function_context.get_metric_group().counter("my_counter")
def eval(self, x, y):
self.counter.inc()
return x + y
my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT())
```
And then use it in SQL as following:
```
CREATE FUNCTION add AS 'custom_udf_2.my_udf'
LANGUAGE PYTHON;
```
Regards,
Dian
On Fri, May 19, 2023 at 6:23 AM tom yang <[email protected]> wrote:
> Hi
>
>
> I am trying to create a flinksql program using python udf & using
> metrics. This is my sample python file
>
> custom_udf_2.py
>
> ```
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
>
> class MyUDF(ScalarFunction):
>
> def __init__(self):
> self.counter = None
>
> def open(self, function_context):
> self.counter =
> function_context.get_metric_group().counter("my_counter")
>
> def eval(self, x, y):
> self.counter.inc()
> return x + y
>
> ```
>
> This is my sql script
>
> ```
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
>
> CREATE TABLE datagen (
> a BIGINT,
> b BIGINT
> ) WITH (
> 'connector' = 'datagen',
> 'fields.a.kind'='sequence',
> 'fields.a.start'='1',
> 'fields.a.end'='8',
> 'fields.b.kind'='sequence',
> 'fields.b.start'='4',
> 'fields.b.end'='11'
> );
>
> CREATE TABLE print_sink (
> `sum` BIGINT
> ) WITH (
> 'connector' = 'print'
> );
>
>
> INSERT into print_sink (
> select add(a,b) FROM datagen
> );
>
> ```
>
> When I try to execute this program I get the following
>
>
> ```
> /bin/sql-client.sh -f ~/python_udf_lab.sql
> --pyFiles ~/custom_udf_2.py
>
> Flink SQL> [INFO] Execute statement succeed.
>
> Flink SQL>
> > CREATE TABLE datagen (
> > a BIGINT,
> > b BIGINT
> > ) WITH (
> > 'connector' = 'datagen',
> > 'fields.a.kind'='sequence',
> > 'fields.a.start'='1',
> > 'fields.a.end'='8',
> > 'fields.b.kind'='sequence',
> > 'fields.b.start'='4',
> > 'fields.b.end'='11'
> > )[INFO] Execute statement succeed.
>
> Flink SQL>
> > CREATE TABLE print_sink (
> > `sum` BIGINT
> > ) WITH (
> > 'connector' = 'print'
> > )[INFO] Execute statement succeed.
>
> Flink SQL>
> >
> > INSERT into print_sink (
> > select add(a,b) FROM datagen
> > )[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Instantiating python function
> 'custom_udf_2.MyUDF' failed.
> ```
>
>
> Ive tried multiple variations of
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
>
> CREATE FUNCTION add AS 'MyUDF'
> LANGUAGE PYTHON;
>
>
> fyi this is on flink 1.16.1 & python 3.9.13
>
> Admittingly I haven’t any documentation on the official documentation
> with this usage. Is this usecase currently supported?
> I know that it works with sql if I change the add function as,
>
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
> result_type=DataTypes.BIGINT())
> def add(i, j):
> return i + j
>
> but then it doesn’t create any metrics
>
> Does anyone has any idea how I can get this to work specifically with
> flinksql with python udf metrics
>
> Thanks,
> Tom
>