大家好!


我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤


环境配置
blink standalone 模式




1. 配置environment 启动sql client


2. 创建kafka sink table
CREATETABLEkafka_sink(
   messageKeyVARBINARY,
   messageValueVARBINARY,
   PRIMARYKEY(messageKey))
with(
   type='KAFKA011',
   topic='sink-topic',
   `bootstrap.servers`='172.19.0.108:9092',
   retries='3'
);


3. 创建查询语句并执行
INSERT INTO kafka_sink
SELECT CAST('123' AS VARBINARY) AS key,
CAST(CONCAT_WS(',', 'HELLO', 'WORLD') AS VARBINARY) AS msg;




错误日志(from task executor log)


主要是找不到kafka common package下面的一个类, 但是启动sql client 时候已经把kafka connector 
相关的jar包包括在内 在提交job时候 也会把这些jars 和 jobgraph一并上传到cluster,理论上这些class都会被加载






2019-02-22 14:37:18,356 ERROR 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.KafkaThread  - 
Uncaught exception in kafka-producer-network-thread | producer-1:
java.lang.NoClassDefFoundError: 
org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C
        at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.DefaultRecordBatch.writeHeader(DefaultRecordBatch.java:468)
        at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.writeDefaultBatchHeader(MemoryRecordsBuilder.java:339)
        at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:293)
        at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:391)
        at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:485)
        at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:254)
        at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
        at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.Crc32C
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)




--
  Best Regards
  Hongtao

回复