Hi Manas,
If you want to return a RowType in Python UDF, you can use Row Class which
extends from python tuple.
You can use the following statement to import Row : from pyflink.table
import Row

Best,
Xingbo

Manas Kale <[email protected]> 于2020年7月6日周一 下午8:08写道:

> I also tried doing this by using a User Defined Function.
>
> class DataConverter(ScalarFunction):
>     def eval(self, str_data):
>             data = json.loads(str_data)
>             return ??     # I want to return data['0001'] in field 
> 'feature1', data['0002'] in field 'feature2' etc.
>
> t_env.register_function("data_converter", udf(DataConverter(), input_types = 
> [DataTypes.STRING()],
>                                               result_type =
>                                               DataTypes.ROW([
>                                                   DataTypes.FIELD("feature1", 
> DataTypes.STRING())
>                                               ])))
>
>
> t_env.from_path(INPUT_TABLE) \
>     .select("data_converter(data)") \ # <--- here "data" is the field "data" 
> from the previous mail
>     .insert_into(OUTPUT_TABLE)
>
>
> I used a ROW to hold multiple values but I can't figure out how I can
> return a populated ROW object from the eval() method. Where is the method
> to construct a row/field object and return it?
>
>
> Thanks!
>
>
> On Fri, Jul 3, 2020 at 12:40 PM Manas Kale <[email protected]> wrote:
>
>> Hi Xingbo,
>> Thanks for the reply, I didn't know that a table schema also needs to be
>> declared after the connect or but I understand now.
>> I have another question: how do I write the parsing schemas for a field
>> that itself is a valid JSON string? For example:
>> {
>>     "monitorId": 865,
>>     "deviceId": "94:54:93:49:96:13",
>>     "data":
>> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
>>     "state": 2,
>>     "time": 1593687809180
>> }
>> The field "data" is a string of valid JSON with string:number objects.
>> I'm currently trying using JSON schema object and DataTypes.ROW, but am
>> getting deserialization errors.
>>
>> .with_format(
>>     Json()
>>         .json_schema(
>>         """
>>         {
>>     "type": "object",
>>     "properties": {
>>         "monitorId": {
>>             "type": "string"
>>         },
>>         "deviceId": {
>>             "type": "string"
>>         },
>>         "data": {
>>             "type": "object"
>>         },
>>         "state": {
>>             "type": "integer"
>>         },
>>         "time": {
>>             "type": "string"
>>         }
>>     }
>> }
>>     """
>>     )
>> ) \
>>     .with_schema(
>>     Schema()
>>         .field("monitorId", DataTypes.STRING())
>>         .field("deviceId", DataTypes.STRING())
>>         .field("data", DataTypes.ROW())
>> )
>>
>> Regards,
>>
>> Manas
>>
>>
>> On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <[email protected]> wrote:
>>
>>> Hi, Manas
>>> You need to define the schema. You can refer to the following example:
>>>  t_env.connect(
>>>     Kafka()
>>>         .version('0.11')
>>>         .topic(INPUT_TOPIC)
>>>         .property("bootstrap.servers", PROD_KAFKA)
>>>         .property("zookeeper.connect", "localhost:2181")
>>>         .start_from_latest()
>>> ) \
>>>     .with_format(
>>>     Json()
>>>         .json_schema(
>>>         "{"
>>>         "  type: 'object',"
>>>         "  properties: {"
>>>         "    lon: {"
>>>         "      type: 'number'"
>>>         "    },"
>>>         "    rideTime: {"
>>>         "      type: 'string',"
>>>         "      format: 'date-time'"
>>>         "    }"
>>>         "  }"
>>>         "}"
>>>     )
>>> ) \
>>>     .with_schema(  # declare the schema of the table
>>>     Schema()
>>>         .field("lon", DataTypes.DECIMAL(20, 10))
>>>         .field("rideTime", DataTypes.TIMESTAMP(6))
>>> ).register_table_source(INPUT_TABLE)
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale <[email protected]> 于2020年7月2日周四 下午7:59写道:
>>>
>>>> Hi,
>>>> I'm trying to get a simple consumer/producer running using the
>>>> following code referred from the provided links :
>>>>
>>>> from pyflink.dataset import ExecutionEnvironment
>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
>>>> StreamTableEnvironment
>>>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>>>
>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>>
>>>> t_config = TableConfig()
>>>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>>>
>>>> INPUT_TOPIC = 'xyz'
>>>> INPUT_TABLE = 'raw_message'
>>>> PROD_ZOOKEEPER = '...'
>>>> PROD_KAFKA = '...'
>>>>
>>>> OUTPUT_TOPIC = 'summary_output'
>>>> OUTPUT_TABLE = 'feature_summary'
>>>> LOCAL_ZOOKEEPER = 'localhost:2181'
>>>> LOCAL_KAFKA = 'localhost:9092'
>>>>
>>>>
>>>> t_env.connect(
>>>>     Kafka()
>>>>     .version('universal')
>>>>     .topic(INPUT_TOPIC)
>>>>     .property("bootstrap.servers", PROD_KAFKA)
>>>>
>>>>     .start_from_latest()
>>>> ) \
>>>> .with_format(
>>>>     Json()
>>>>     .json_schema(
>>>>         "{"
>>>>         "  type: 'object',"
>>>>         "  properties: {"
>>>>         "    lon: {"
>>>>         "      type: 'number'"
>>>>         "    },"
>>>>         "    rideTime: {"
>>>>         "      type: 'string',"
>>>>         "      format: 'date-time'"
>>>>         "    }"
>>>>         "  }"
>>>>         "}"
>>>>     )
>>>> ).register_table_source(INPUT_TABLE)
>>>>
>>>> t_env.connect(Kafka()
>>>>     .version('universal')
>>>>     .topic(OUTPUT_TOPIC)
>>>>     .property("bootstrap.servers", LOCAL_KAFKA)
>>>>
>>>>     .start_from_latest()
>>>>     ) \
>>>>     .with_format(
>>>>     Json()
>>>>     .json_schema(
>>>>        "{"
>>>>         "  type: 'object',"
>>>>         "  properties: {"
>>>>         "    lon: {"
>>>>         "      type: 'number'"
>>>>         "    },"
>>>>         "    rideTime: {"
>>>>         "      type: 'string',"
>>>>         "      format: 'date-time'"
>>>>         "    }"
>>>>         "  }"
>>>>         "}"
>>>>     )).register_table_sink(OUTPUT_TABLE)
>>>>
>>>> t_env.from_path(INPUT_TABLE) \
>>>>     .insert_into(OUTPUT_TABLE)
>>>>
>>>> t_env.execute('IU pyflink job')
>>>>
>>>> *However, I am getting the following exception : *
>>>>
>>>> Traceback (most recent call last):
>>>>   File 
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>>>  line 147, in deco
>>>>     return f(*a, **kw)
>>>>   File 
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>>>  line 328, in get_return_value
>>>>     format(target_id, ".", name), value)
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling 
>>>> o32.registerTableSource.
>>>> : org.apache.flink.table.api.TableException: findAndCreateTableSource 
>>>> failed.
>>>>    at 
>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>>>    at 
>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
>>>>    at 
>>>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>    at 
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>    at 
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
>>>>    at 
>>>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>>    at 
>>>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>>    at 
>>>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>>    at 
>>>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>>    at 
>>>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>    at 
>>>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>>    at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.table.api.ValidationException: Could not find 
>>>> the required schema in property 'schema'.
>>>>    at 
>>>> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
>>>>    at 
>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
>>>>    at 
>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
>>>>    at 
>>>> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>>>>    at 
>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
>>>>    ... 13 more
>>>>
>>>>
>>>> During handling of the above exception, another exception occurred:
>>>>
>>>> Traceback (most recent call last):
>>>>   File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in 
>>>> <module>
>>>>     ).register_table_source(INPUT_TABLE)
>>>>   File 
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
>>>>  line 1295, in register_table_source
>>>>     self._j_connect_table_descriptor.registerTableSource(name)
>>>>   File 
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>>>  line 1286, in __call__
>>>>     answer, self.gateway_client, self.target_id, self.name)
>>>>   File 
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>>>  line 154, in deco
>>>>     raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>>>> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'
>>>>
>>>>
>>>> The relevant part seems to be *Caused by: 
>>>> org.apache.flink.table.api.ValidationException: Could not find the 
>>>> required schema in property 'schema'.*
>>>>
>>>> This is probably a basic error, but I can't figure out how I can know 
>>>> what's wrong with the schema. Is the schema not properly declared? Is some 
>>>> field missing?
>>>>
>>>> FWIW I have included the JSON and kafka connector JARs in the required 
>>>> location.
>>>>
>>>>
>>>> Regards,
>>>> Manas
>>>>
>>>>
>>>> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Xingbo,
>>>>> Thank you for the information, it certainly helps!
>>>>>
>>>>> Regards,
>>>>> Manas
>>>>>
>>>>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Manas,
>>>>>>
>>>>>> Since Flink 1.9, the entire architecture of PyFlink has been
>>>>>> redesigned. So the method described in the link won't work.
>>>>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>>>>>> data. Besides, You can refer to the common questions about PyFlink[3]
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>> [3]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>>>>>
>>>>>> Best,
>>>>>> Xingbo
>>>>>>
>>>>>> Manas Kale <[email protected]> 于2020年6月29日周一 下午8:10写道:
>>>>>>
>>>>>>> Hi,
>>>>>>> I want to consume and write to Kafak from Flink's python API.
>>>>>>>
>>>>>>> The only way I found to do this was through this
>>>>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>>>>>  question
>>>>>>> on SO where the user essentially copies FlinkKafka connector JARs into 
>>>>>>> the
>>>>>>> Flink runtime's lib/ directory.
>>>>>>>
>>>>>>>    - Is this the recommended method to do this? If not, what is?
>>>>>>>    - Is there any official documentation for using Kafka
>>>>>>>    with pyFlink? Is this officially supported?
>>>>>>>    - How does the method described in the link work? Does the Flink
>>>>>>>    runtime load and expose all JARs in /lib to the python script? Can I 
>>>>>>> write
>>>>>>>    custom operators in Java and use those through python?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Manas
>>>>>>>
>>>>>>

Reply via email to