Hi Xingbo,
Thanks for the reply, I didn't know that a table schema also needs to be
declared after the connect or but I understand now.
I have another question: how do I write the parsing schemas for a field
that itself is a valid JSON string? For example:
{
    "monitorId": 865,
    "deviceId": "94:54:93:49:96:13",
    "data":
"{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
    "state": 2,
    "time": 1593687809180
}
The field "data" is a string of valid JSON with string:number objects. I'm
currently trying using JSON schema object and DataTypes.ROW, but am getting
deserialization errors.

.with_format(
    Json()
        .json_schema(
        """
        {
    "type": "object",
    "properties": {
        "monitorId": {
            "type": "string"
        },
        "deviceId": {
            "type": "string"
        },
        "data": {
            "type": "object"
        },
        "state": {
            "type": "integer"
        },
        "time": {
            "type": "string"
        }
    }
}
    """
    )
) \
    .with_schema(
    Schema()
        .field("monitorId", DataTypes.STRING())
        .field("deviceId", DataTypes.STRING())
        .field("data", DataTypes.ROW())
)

Regards,

Manas


On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi, Manas
> You need to define the schema. You can refer to the following example:
>  t_env.connect(
>     Kafka()
>         .version('0.11')
>         .topic(INPUT_TOPIC)
>         .property("bootstrap.servers", PROD_KAFKA)
>         .property("zookeeper.connect", "localhost:2181")
>         .start_from_latest()
> ) \
>     .with_format(
>     Json()
>         .json_schema(
>         "{"
>         "  type: 'object',"
>         "  properties: {"
>         "    lon: {"
>         "      type: 'number'"
>         "    },"
>         "    rideTime: {"
>         "      type: 'string',"
>         "      format: 'date-time'"
>         "    }"
>         "  }"
>         "}"
>     )
> ) \
>     .with_schema(  # declare the schema of the table
>     Schema()
>         .field("lon", DataTypes.DECIMAL(20, 10))
>         .field("rideTime", DataTypes.TIMESTAMP(6))
> ).register_table_source(INPUT_TABLE)
>
> Best,
> Xingbo
>
> Manas Kale <manaskal...@gmail.com> 于2020年7月2日周四 下午7:59写道:
>
>> Hi,
>> I'm trying to get a simple consumer/producer running using the following
>> code referred from the provided links :
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
>> StreamTableEnvironment
>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>
>> t_config = TableConfig()
>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>
>> INPUT_TOPIC = 'xyz'
>> INPUT_TABLE = 'raw_message'
>> PROD_ZOOKEEPER = '...'
>> PROD_KAFKA = '...'
>>
>> OUTPUT_TOPIC = 'summary_output'
>> OUTPUT_TABLE = 'feature_summary'
>> LOCAL_ZOOKEEPER = 'localhost:2181'
>> LOCAL_KAFKA = 'localhost:9092'
>>
>>
>> t_env.connect(
>>     Kafka()
>>     .version('universal')
>>     .topic(INPUT_TOPIC)
>>     .property("bootstrap.servers", PROD_KAFKA)
>>
>>     .start_from_latest()
>> ) \
>> .with_format(
>>     Json()
>>     .json_schema(
>>         "{"
>>         "  type: 'object',"
>>         "  properties: {"
>>         "    lon: {"
>>         "      type: 'number'"
>>         "    },"
>>         "    rideTime: {"
>>         "      type: 'string',"
>>         "      format: 'date-time'"
>>         "    }"
>>         "  }"
>>         "}"
>>     )
>> ).register_table_source(INPUT_TABLE)
>>
>> t_env.connect(Kafka()
>>     .version('universal')
>>     .topic(OUTPUT_TOPIC)
>>     .property("bootstrap.servers", LOCAL_KAFKA)
>>
>>     .start_from_latest()
>>     ) \
>>     .with_format(
>>     Json()
>>     .json_schema(
>>        "{"
>>         "  type: 'object',"
>>         "  properties: {"
>>         "    lon: {"
>>         "      type: 'number'"
>>         "    },"
>>         "    rideTime: {"
>>         "      type: 'string',"
>>         "      format: 'date-time'"
>>         "    }"
>>         "  }"
>>         "}"
>>     )).register_table_sink(OUTPUT_TABLE)
>>
>> t_env.from_path(INPUT_TABLE) \
>>     .insert_into(OUTPUT_TABLE)
>>
>> t_env.execute('IU pyflink job')
>>
>> *However, I am getting the following exception : *
>>
>> Traceback (most recent call last):
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>  line 147, in deco
>>     return f(*a, **kw)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>  line 328, in get_return_value
>>     format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling 
>> o32.registerTableSource.
>> : org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
>>      at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>      at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
>>      at 
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>      at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find 
>> the required schema in property 'schema'.
>>      at 
>> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
>>      at 
>> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>>      at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
>>      ... 13 more
>>
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in 
>> <module>
>>     ).register_table_source(INPUT_TABLE)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
>>  line 1295, in register_table_source
>>     self._j_connect_table_descriptor.registerTableSource(name)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>  line 1286, in __call__
>>     answer, self.gateway_client, self.target_id, self.name)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>  line 154, in deco
>>     raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'
>>
>>
>> The relevant part seems to be *Caused by: 
>> org.apache.flink.table.api.ValidationException: Could not find the required 
>> schema in property 'schema'.*
>>
>> This is probably a basic error, but I can't figure out how I can know what's 
>> wrong with the schema. Is the schema not properly declared? Is some field 
>> missing?
>>
>> FWIW I have included the JSON and kafka connector JARs in the required 
>> location.
>>
>>
>> Regards,
>> Manas
>>
>>
>> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <manaskal...@gmail.com>
>> wrote:
>>
>>> Hi Xingbo,
>>> Thank you for the information, it certainly helps!
>>>
>>> Regards,
>>> Manas
>>>
>>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <hxbks...@gmail.com> wrote:
>>>
>>>> Hi Manas,
>>>>
>>>> Since Flink 1.9, the entire architecture of PyFlink has been
>>>> redesigned. So the method described in the link won't work.
>>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>>>> data. Besides, You can refer to the common questions about PyFlink[3]
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Manas Kale <manaskal...@gmail.com> 于2020年6月29日周一 下午8:10写道:
>>>>
>>>>> Hi,
>>>>> I want to consume and write to Kafak from Flink's python API.
>>>>>
>>>>> The only way I found to do this was through this
>>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>>>  question
>>>>> on SO where the user essentially copies FlinkKafka connector JARs into the
>>>>> Flink runtime's lib/ directory.
>>>>>
>>>>>    - Is this the recommended method to do this? If not, what is?
>>>>>    - Is there any official documentation for using Kafka
>>>>>    with pyFlink? Is this officially supported?
>>>>>    - How does the method described in the link work? Does the Flink
>>>>>    runtime load and expose all JARs in /lib to the python script? Can I 
>>>>> write
>>>>>    custom operators in Java and use those through python?
>>>>>
>>>>> Thanks,
>>>>> Manas
>>>>>
>>>>

Reply via email to