Hi,
I'm trying to get a simple consumer/producer running using the following
code referred from the provided links :

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment,
DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

exec_env = StreamExecutionEnvironment.get_execution_environment()

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'

OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'


t_env.connect(
    Kafka()
    .version('universal')
    .topic(INPUT_TOPIC)
    .property("bootstrap.servers", PROD_KAFKA)

    .start_from_latest()
) \
.with_format(
    Json()
    .json_schema(
        "{"
        "  type: 'object',"
        "  properties: {"
        "    lon: {"
        "      type: 'number'"
        "    },"
        "    rideTime: {"
        "      type: 'string',"
        "      format: 'date-time'"
        "    }"
        "  }"
        "}"
    )
).register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
    .version('universal')
    .topic(OUTPUT_TOPIC)
    .property("bootstrap.servers", LOCAL_KAFKA)

    .start_from_latest()
    ) \
    .with_format(
    Json()
    .json_schema(
       "{"
        "  type: 'object',"
        "  properties: {"
        "    lon: {"
        "      type: 'number'"
        "    },"
        "    rideTime: {"
        "      type: 'string',"
        "      format: 'date-time'"
        "    }"
        "  }"
        "}"
    )).register_table_sink(OUTPUT_TABLE)

t_env.from_path(INPUT_TABLE) \
    .insert_into(OUTPUT_TABLE)

t_env.execute('IU pyflink job')

*However, I am getting the following exception : *

Traceback (most recent call last):
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
    return f(*a, **kw)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling
o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
        at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
        at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
        at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not
find the required schema in property 'schema'.
        at 
org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
        at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
        at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
        at 
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
        at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
        ... 13 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46,
in <module>
    ).register_table_source(INPUT_TABLE)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
line 1295, in register_table_source
    self._j_connect_table_descriptor.registerTableSource(name)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'


The relevant part seems to be *Caused by:
org.apache.flink.table.api.ValidationException: Could not find the
required schema in property 'schema'.*

This is probably a basic error, but I can't figure out how I can know
what's wrong with the schema. Is the schema not properly declared? Is
some field missing?

FWIW I have included the JSON and kafka connector JARs in the required location.


Regards,
Manas


On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[email protected]> wrote:

> Hi Xingbo,
> Thank you for the information, it certainly helps!
>
> Regards,
> Manas
>
> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[email protected]> wrote:
>
>> Hi Manas,
>>
>> Since Flink 1.9, the entire architecture of PyFlink has been redesigned.
>> So the method described in the link won't work.
>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>> data. Besides, You can refer to the common questions about PyFlink[3]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>
>> Best,
>> Xingbo
>>
>> Manas Kale <[email protected]> 于2020年6月29日周一 下午8:10写道:
>>
>>> Hi,
>>> I want to consume and write to Kafak from Flink's python API.
>>>
>>> The only way I found to do this was through this
>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>  question
>>> on SO where the user essentially copies FlinkKafka connector JARs into the
>>> Flink runtime's lib/ directory.
>>>
>>>    - Is this the recommended method to do this? If not, what is?
>>>    - Is there any official documentation for using Kafka with pyFlink?
>>>    Is this officially supported?
>>>    - How does the method described in the link work? Does the Flink
>>>    runtime load and expose all JARs in /lib to the python script? Can I 
>>> write
>>>    custom operators in Java and use those through python?
>>>
>>> Thanks,
>>> Manas
>>>
>>

Reply via email to