大家好!
我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤
环境配置
blink standalone 模式
1. 配置environment 启动sql client
2. 创建kafka sink table
CREATETABLEkafka_sink(
messageKeyVARBINARY,
messageValueVARBINARY,
PRIMARYKEY(messageKey))
with(
type='KAFKA011',
topic='sink-topic',
`bootstrap.servers`='172.19.0.108:9092',
retries='3'
);
3. 创建查询语句并执行
INSERT INTO kafka_sink
SELECT CAST('123' AS VARBINARY) AS key,
CAST(CONCAT_WS(',', 'HELLO', 'WORLD') AS VARBINARY) AS msg;
错误日志(from task executor log)
主要是找不到kafka common package下面的一个类, 但是启动sql client 时候已经把kafka connector
相关的jar包包括在内 在提交job时候 也会把这些jars 和 jobgraph一并上传到cluster,理论上这些class都会被加载
2019-02-22 14:37:18,356 ERROR
org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.KafkaThread -
Uncaught exception in kafka-producer-network-thread | producer-1:
java.lang.NoClassDefFoundError:
org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C
at
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.DefaultRecordBatch.writeHeader(DefaultRecordBatch.java:468)
at
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.writeDefaultBatchHeader(MemoryRecordsBuilder.java:339)
at
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:293)
at
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:391)
at
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:485)
at
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:254)
at
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
at
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.Crc32C
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
--
Best Regards
Hongtao