I guess it's because the ES version specified in the job is `6`, however, the 
jar used is `5`.

> 在 2020年6月16日,下午1:47,jack <wslyk...@163.com> 写道:
> 
> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> 连接es的时候报错,findAndCreateTableSink   failed。 
> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> 
> Caused by Could not find a suitable  factory for   
> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> Reason: Required context properties mismatch
> 
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, DataTypes, 
> EnvironmentSettings
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> Elasticsearch
> 
> 
> def area_cnts():
>     s_env = StreamExecutionEnvironment.get_execution_environment()
>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>     s_env.set_parallelism(1)
> 
>     # use blink table planner
>     st_env = StreamTableEnvironment \
>         .create(s_env, environment_settings=EnvironmentSettings
>                 .new_instance()
>                 .in_streaming_mode()
>                 .use_blink_planner().build())
> 
>     # register source and sink
>     register_rides_source(st_env)
>     register_cnt_sink(st_env)
> 
>     # query
>     st_env.from_path("source")\
>         .group_by("taxiId")\
>         .select("taxiId, count(1) as cnt")\
>         .insert_into("sink")
> 
>     # execute
>     st_env.execute("6-write_with_elasticsearch")
> 
> 
> def register_rides_source(st_env):
>     st_env \
>         .connect(  # declare the external system to connect to
>         Kafka()
>             .version("universal")
>             .topic("Rides")
>             .start_from_earliest()
>             .property("zookeeper.connect", "zookeeper:2181")
>             .property("bootstrap.servers", "kafka:9092")) \
>         .with_format(  # declare a format for this system
>         Json()
>             .fail_on_missing_field(True)
>             .schema(DataTypes.ROW([
>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>         .with_schema(  # declare the schema of the table
>         Schema()
>             .field("rideId", DataTypes.BIGINT())
>             .field("taxiId", DataTypes.BIGINT())
>             .field("isStart", DataTypes.BOOLEAN())
>             .field("lon", DataTypes.FLOAT())
>             .field("lat", DataTypes.FLOAT())
>             .field("psgCnt", DataTypes.INT())
>             .field("rideTime", DataTypes.TIMESTAMP())
>             .rowtime(
>             Rowtime()
>                 .timestamps_from_field("eventTime")
>                 .watermarks_periodic_bounded(60000))) \
>         .in_append_mode() \
>         .register_table_source("source")
> 
> 
> def register_cnt_sink(st_env):
>     st_env.connect(
>         Elasticsearch()
>             .version("6")
>             .host("elasticsearch", 9200, "http")
>             .index("taxiid-cnts")
>             .document_type('taxiidcnt')
>             .key_delimiter("$")) \
>         .with_schema(
>             Schema()
>                 .field("taxiId", DataTypes.BIGINT())
>                 .field("cnt", DataTypes.BIGINT())) \
>         .with_format(
>            Json()
>                .derive_schema()) \
>         .in_upsert_mode() \
>         .register_table_sink("sink")
> 
> 
> if __name__ == '__main__':
>     area_cnts()
> 

Reply via email to