Hi Manas, If you want to return a RowType in Python UDF, you can use Row Class which extends from python tuple. You can use the following statement to import Row : from pyflink.table import Row
Best, Xingbo Manas Kale <[email protected]> 于2020年7月6日周一 下午8:08写道: > I also tried doing this by using a User Defined Function. > > class DataConverter(ScalarFunction): > def eval(self, str_data): > data = json.loads(str_data) > return ?? # I want to return data['0001'] in field > 'feature1', data['0002'] in field 'feature2' etc. > > t_env.register_function("data_converter", udf(DataConverter(), input_types = > [DataTypes.STRING()], > result_type = > DataTypes.ROW([ > DataTypes.FIELD("feature1", > DataTypes.STRING()) > ]))) > > > t_env.from_path(INPUT_TABLE) \ > .select("data_converter(data)") \ # <--- here "data" is the field "data" > from the previous mail > .insert_into(OUTPUT_TABLE) > > > I used a ROW to hold multiple values but I can't figure out how I can > return a populated ROW object from the eval() method. Where is the method > to construct a row/field object and return it? > > > Thanks! > > > On Fri, Jul 3, 2020 at 12:40 PM Manas Kale <[email protected]> wrote: > >> Hi Xingbo, >> Thanks for the reply, I didn't know that a table schema also needs to be >> declared after the connect or but I understand now. >> I have another question: how do I write the parsing schemas for a field >> that itself is a valid JSON string? For example: >> { >> "monitorId": 865, >> "deviceId": "94:54:93:49:96:13", >> "data": >> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}", >> "state": 2, >> "time": 1593687809180 >> } >> The field "data" is a string of valid JSON with string:number objects. >> I'm currently trying using JSON schema object and DataTypes.ROW, but am >> getting deserialization errors. >> >> .with_format( >> Json() >> .json_schema( >> """ >> { >> "type": "object", >> "properties": { >> "monitorId": { >> "type": "string" >> }, >> "deviceId": { >> "type": "string" >> }, >> "data": { >> "type": "object" >> }, >> "state": { >> "type": "integer" >> }, >> "time": { >> "type": "string" >> } >> } >> } >> """ >> ) >> ) \ >> .with_schema( >> Schema() >> .field("monitorId", DataTypes.STRING()) >> .field("deviceId", DataTypes.STRING()) >> .field("data", DataTypes.ROW()) >> ) >> >> Regards, >> >> Manas >> >> >> On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <[email protected]> wrote: >> >>> Hi, Manas >>> You need to define the schema. You can refer to the following example: >>> t_env.connect( >>> Kafka() >>> .version('0.11') >>> .topic(INPUT_TOPIC) >>> .property("bootstrap.servers", PROD_KAFKA) >>> .property("zookeeper.connect", "localhost:2181") >>> .start_from_latest() >>> ) \ >>> .with_format( >>> Json() >>> .json_schema( >>> "{" >>> " type: 'object'," >>> " properties: {" >>> " lon: {" >>> " type: 'number'" >>> " }," >>> " rideTime: {" >>> " type: 'string'," >>> " format: 'date-time'" >>> " }" >>> " }" >>> "}" >>> ) >>> ) \ >>> .with_schema( # declare the schema of the table >>> Schema() >>> .field("lon", DataTypes.DECIMAL(20, 10)) >>> .field("rideTime", DataTypes.TIMESTAMP(6)) >>> ).register_table_source(INPUT_TABLE) >>> >>> Best, >>> Xingbo >>> >>> Manas Kale <[email protected]> 于2020年7月2日周四 下午7:59写道: >>> >>>> Hi, >>>> I'm trying to get a simple consumer/producer running using the >>>> following code referred from the provided links : >>>> >>>> from pyflink.dataset import ExecutionEnvironment >>>> from pyflink.datastream import StreamExecutionEnvironment >>>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, >>>> StreamTableEnvironment >>>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema >>>> >>>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>>> >>>> t_config = TableConfig() >>>> t_env = StreamTableEnvironment.create(exec_env, t_config) >>>> >>>> INPUT_TOPIC = 'xyz' >>>> INPUT_TABLE = 'raw_message' >>>> PROD_ZOOKEEPER = '...' >>>> PROD_KAFKA = '...' >>>> >>>> OUTPUT_TOPIC = 'summary_output' >>>> OUTPUT_TABLE = 'feature_summary' >>>> LOCAL_ZOOKEEPER = 'localhost:2181' >>>> LOCAL_KAFKA = 'localhost:9092' >>>> >>>> >>>> t_env.connect( >>>> Kafka() >>>> .version('universal') >>>> .topic(INPUT_TOPIC) >>>> .property("bootstrap.servers", PROD_KAFKA) >>>> >>>> .start_from_latest() >>>> ) \ >>>> .with_format( >>>> Json() >>>> .json_schema( >>>> "{" >>>> " type: 'object'," >>>> " properties: {" >>>> " lon: {" >>>> " type: 'number'" >>>> " }," >>>> " rideTime: {" >>>> " type: 'string'," >>>> " format: 'date-time'" >>>> " }" >>>> " }" >>>> "}" >>>> ) >>>> ).register_table_source(INPUT_TABLE) >>>> >>>> t_env.connect(Kafka() >>>> .version('universal') >>>> .topic(OUTPUT_TOPIC) >>>> .property("bootstrap.servers", LOCAL_KAFKA) >>>> >>>> .start_from_latest() >>>> ) \ >>>> .with_format( >>>> Json() >>>> .json_schema( >>>> "{" >>>> " type: 'object'," >>>> " properties: {" >>>> " lon: {" >>>> " type: 'number'" >>>> " }," >>>> " rideTime: {" >>>> " type: 'string'," >>>> " format: 'date-time'" >>>> " }" >>>> " }" >>>> "}" >>>> )).register_table_sink(OUTPUT_TABLE) >>>> >>>> t_env.from_path(INPUT_TABLE) \ >>>> .insert_into(OUTPUT_TABLE) >>>> >>>> t_env.execute('IU pyflink job') >>>> >>>> *However, I am getting the following exception : * >>>> >>>> Traceback (most recent call last): >>>> File >>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >>>> line 147, in deco >>>> return f(*a, **kw) >>>> File >>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", >>>> line 328, in get_return_value >>>> format(target_id, ".", name), value) >>>> py4j.protocol.Py4JJavaError: An error occurred while calling >>>> o32.registerTableSource. >>>> : org.apache.flink.table.api.TableException: findAndCreateTableSource >>>> failed. >>>> at >>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >>>> at >>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42) >>>> at >>>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>>> at >>>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>>> at >>>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>>> at >>>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>>> at >>>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>>> at >>>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.flink.table.api.ValidationException: Could not find >>>> the required schema in property 'schema'. >>>> at >>>> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158) >>>> at >>>> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) >>>> at >>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53) >>>> ... 13 more >>>> >>>> >>>> During handling of the above exception, another exception occurred: >>>> >>>> Traceback (most recent call last): >>>> File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in >>>> <module> >>>> ).register_table_source(INPUT_TABLE) >>>> File >>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", >>>> line 1295, in register_table_source >>>> self._j_connect_table_descriptor.registerTableSource(name) >>>> File >>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", >>>> line 1286, in __call__ >>>> answer, self.gateway_client, self.target_id, self.name) >>>> File >>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >>>> line 154, in deco >>>> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) >>>> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.' >>>> >>>> >>>> The relevant part seems to be *Caused by: >>>> org.apache.flink.table.api.ValidationException: Could not find the >>>> required schema in property 'schema'.* >>>> >>>> This is probably a basic error, but I can't figure out how I can know >>>> what's wrong with the schema. Is the schema not properly declared? Is some >>>> field missing? >>>> >>>> FWIW I have included the JSON and kafka connector JARs in the required >>>> location. >>>> >>>> >>>> Regards, >>>> Manas >>>> >>>> >>>> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[email protected]> >>>> wrote: >>>> >>>>> Hi Xingbo, >>>>> Thank you for the information, it certainly helps! >>>>> >>>>> Regards, >>>>> Manas >>>>> >>>>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Manas, >>>>>> >>>>>> Since Flink 1.9, the entire architecture of PyFlink has been >>>>>> redesigned. So the method described in the link won't work. >>>>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka >>>>>> data. Besides, You can refer to the common questions about PyFlink[3] >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>> [3] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html >>>>>> >>>>>> Best, >>>>>> Xingbo >>>>>> >>>>>> Manas Kale <[email protected]> 于2020年6月29日周一 下午8:10写道: >>>>>> >>>>>>> Hi, >>>>>>> I want to consume and write to Kafak from Flink's python API. >>>>>>> >>>>>>> The only way I found to do this was through this >>>>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class> >>>>>>> question >>>>>>> on SO where the user essentially copies FlinkKafka connector JARs into >>>>>>> the >>>>>>> Flink runtime's lib/ directory. >>>>>>> >>>>>>> - Is this the recommended method to do this? If not, what is? >>>>>>> - Is there any official documentation for using Kafka >>>>>>> with pyFlink? Is this officially supported? >>>>>>> - How does the method described in the link work? Does the Flink >>>>>>> runtime load and expose all JARs in /lib to the python script? Can I >>>>>>> write >>>>>>> custom operators in Java and use those through python? >>>>>>> >>>>>>> Thanks, >>>>>>> Manas >>>>>>> >>>>>>
