Hi Xingbo, Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now. I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example: { "monitorId": 865, "deviceId": "94:54:93:49:96:13", "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}", "state": 2, "time": 1593687809180 } The field "data" is a string of valid JSON with string:number objects. I'm currently trying using JSON schema object and DataTypes.ROW, but am getting deserialization errors.
.with_format( Json() .json_schema( """ { "type": "object", "properties": { "monitorId": { "type": "string" }, "deviceId": { "type": "string" }, "data": { "type": "object" }, "state": { "type": "integer" }, "time": { "type": "string" } } } """ ) ) \ .with_schema( Schema() .field("monitorId", DataTypes.STRING()) .field("deviceId", DataTypes.STRING()) .field("data", DataTypes.ROW()) ) Regards, Manas On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi, Manas > You need to define the schema. You can refer to the following example: > t_env.connect( > Kafka() > .version('0.11') > .topic(INPUT_TOPIC) > .property("bootstrap.servers", PROD_KAFKA) > .property("zookeeper.connect", "localhost:2181") > .start_from_latest() > ) \ > .with_format( > Json() > .json_schema( > "{" > " type: 'object'," > " properties: {" > " lon: {" > " type: 'number'" > " }," > " rideTime: {" > " type: 'string'," > " format: 'date-time'" > " }" > " }" > "}" > ) > ) \ > .with_schema( # declare the schema of the table > Schema() > .field("lon", DataTypes.DECIMAL(20, 10)) > .field("rideTime", DataTypes.TIMESTAMP(6)) > ).register_table_source(INPUT_TABLE) > > Best, > Xingbo > > Manas Kale <manaskal...@gmail.com> 于2020年7月2日周四 下午7:59写道: > >> Hi, >> I'm trying to get a simple consumer/producer running using the following >> code referred from the provided links : >> >> from pyflink.dataset import ExecutionEnvironment >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, >> StreamTableEnvironment >> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema >> >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> >> t_config = TableConfig() >> t_env = StreamTableEnvironment.create(exec_env, t_config) >> >> INPUT_TOPIC = 'xyz' >> INPUT_TABLE = 'raw_message' >> PROD_ZOOKEEPER = '...' >> PROD_KAFKA = '...' >> >> OUTPUT_TOPIC = 'summary_output' >> OUTPUT_TABLE = 'feature_summary' >> LOCAL_ZOOKEEPER = 'localhost:2181' >> LOCAL_KAFKA = 'localhost:9092' >> >> >> t_env.connect( >> Kafka() >> .version('universal') >> .topic(INPUT_TOPIC) >> .property("bootstrap.servers", PROD_KAFKA) >> >> .start_from_latest() >> ) \ >> .with_format( >> Json() >> .json_schema( >> "{" >> " type: 'object'," >> " properties: {" >> " lon: {" >> " type: 'number'" >> " }," >> " rideTime: {" >> " type: 'string'," >> " format: 'date-time'" >> " }" >> " }" >> "}" >> ) >> ).register_table_source(INPUT_TABLE) >> >> t_env.connect(Kafka() >> .version('universal') >> .topic(OUTPUT_TOPIC) >> .property("bootstrap.servers", LOCAL_KAFKA) >> >> .start_from_latest() >> ) \ >> .with_format( >> Json() >> .json_schema( >> "{" >> " type: 'object'," >> " properties: {" >> " lon: {" >> " type: 'number'" >> " }," >> " rideTime: {" >> " type: 'string'," >> " format: 'date-time'" >> " }" >> " }" >> "}" >> )).register_table_sink(OUTPUT_TABLE) >> >> t_env.from_path(INPUT_TABLE) \ >> .insert_into(OUTPUT_TABLE) >> >> t_env.execute('IU pyflink job') >> >> *However, I am getting the following exception : * >> >> Traceback (most recent call last): >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >> line 147, in deco >> return f(*a, **kw) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", >> line 328, in get_return_value >> format(target_id, ".", name), value) >> py4j.protocol.Py4JJavaError: An error occurred while calling >> o32.registerTableSource. >> : org.apache.flink.table.api.TableException: findAndCreateTableSource failed. >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42) >> at >> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.table.api.ValidationException: Could not find >> the required schema in property 'schema'. >> at >> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90) >> at >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269) >> at >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158) >> at >> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53) >> ... 13 more >> >> >> During handling of the above exception, another exception occurred: >> >> Traceback (most recent call last): >> File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in >> <module> >> ).register_table_source(INPUT_TABLE) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", >> line 1295, in register_table_source >> self._j_connect_table_descriptor.registerTableSource(name) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", >> line 1286, in __call__ >> answer, self.gateway_client, self.target_id, self.name) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >> line 154, in deco >> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) >> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.' >> >> >> The relevant part seems to be *Caused by: >> org.apache.flink.table.api.ValidationException: Could not find the required >> schema in property 'schema'.* >> >> This is probably a basic error, but I can't figure out how I can know what's >> wrong with the schema. Is the schema not properly declared? Is some field >> missing? >> >> FWIW I have included the JSON and kafka connector JARs in the required >> location. >> >> >> Regards, >> Manas >> >> >> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <manaskal...@gmail.com> >> wrote: >> >>> Hi Xingbo, >>> Thank you for the information, it certainly helps! >>> >>> Regards, >>> Manas >>> >>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <hxbks...@gmail.com> wrote: >>> >>>> Hi Manas, >>>> >>>> Since Flink 1.9, the entire architecture of PyFlink has been >>>> redesigned. So the method described in the link won't work. >>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka >>>> data. Besides, You can refer to the common questions about PyFlink[3] >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>> [3] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html >>>> >>>> Best, >>>> Xingbo >>>> >>>> Manas Kale <manaskal...@gmail.com> 于2020年6月29日周一 下午8:10写道: >>>> >>>>> Hi, >>>>> I want to consume and write to Kafak from Flink's python API. >>>>> >>>>> The only way I found to do this was through this >>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class> >>>>> question >>>>> on SO where the user essentially copies FlinkKafka connector JARs into the >>>>> Flink runtime's lib/ directory. >>>>> >>>>> - Is this the recommended method to do this? If not, what is? >>>>> - Is there any official documentation for using Kafka >>>>> with pyFlink? Is this officially supported? >>>>> - How does the method described in the link work? Does the Flink >>>>> runtime load and expose all JARs in /lib to the python script? Can I >>>>> write >>>>> custom operators in Java and use those through python? >>>>> >>>>> Thanks, >>>>> Manas >>>>> >>>>