我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是
flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink failed。
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
Caused by Could not find a suitable factory for
‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
Reason: Required context properties mismatch
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime,
Elasticsearch
def area_cnts():
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
# use blink table planner
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
# register source and sink
register_rides_source(st_env)
register_cnt_sink(st_env)
# query
st_env.from_path("source")\
.group_by("taxiId")\
.select("taxiId, count(1) as cnt")\
.insert_into("sink")
# execute
st_env.execute("6-write_with_elasticsearch")
def register_rides_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("Rides")
.start_from_earliest()
.property("zookeeper.connect", "zookeeper:2181")
.property("bootstrap.servers", "kafka:9092")) \
.with_format( # declare a format for this system
Json()
.fail_on_missing_field(True)
.schema(DataTypes.ROW([
DataTypes.FIELD("rideId", DataTypes.BIGINT()),
DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
DataTypes.FIELD("lon", DataTypes.FLOAT()),
DataTypes.FIELD("lat", DataTypes.FLOAT()),
DataTypes.FIELD("psgCnt", DataTypes.INT()),
DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
.with_schema( # declare the schema of the table
Schema()
.field("rideId", DataTypes.BIGINT())
.field("taxiId", DataTypes.BIGINT())
.field("isStart", DataTypes.BOOLEAN())
.field("lon", DataTypes.FLOAT())
.field("lat", DataTypes.FLOAT())
.field("psgCnt", DataTypes.INT())
.field("rideTime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("eventTime")
.watermarks_periodic_bounded(60000))) \
.in_append_mode() \
.register_table_source("source")
def register_cnt_sink(st_env):
st_env.connect(
Elasticsearch()
.version("6")
.host("elasticsearch", 9200, "http")
.index("taxiid-cnts")
.document_type('taxiidcnt')
.key_delimiter("$")) \
.with_schema(
Schema()
.field("taxiId", DataTypes.BIGINT())
.field("cnt", DataTypes.BIGINT())) \
.with_format(
Json()
.derive_schema()) \
.in_upsert_mode() \
.register_table_sink("sink")
if __name__ == '__main__':
area_cnts()