Re:Re: Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0
感谢回复,我这边问题已经修复了,修改一下clients的版本到2.4.1就可以了 在 2022-08-26 16:20:27,"Weihua Hu" 写道: >可以尝试升级到 2.5+ > >Best, >Weihua > > >On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg wrote: > >> 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢 >> 在 2022-08-25 18:31:06,"Weihua Hu" 写道: >> >kafka 集群的版本是什么呢?看起来是集群版本有点低了 >> > >> >Best, >> >Weihua >> > >> > >> >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: >> > >> >> 大佬们好: >> >> >> >> >> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, >> >> >> >> 异常如下: >> >> >> >> 2022-08-25 10:42:44 >> >> >> >> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted >> to write a non-default producerId at version 0 >> >> >> >> 相关代码如下: >> >> Properties properties = new Properties(); >> >> properties.put("bootstrap.servers", >> >> KafkaConstant.bootstrap_servers_01); >> >> properties.put("transaction.timeout.ms", 15 * 60 * 1000); >> >> FlinkKafkaProducer statsLogV2Producer = new >> >> FlinkKafkaProducer<>( >> >> KafkaConstant.topic_01, >> >> new MyKafkaSerializationSchema(KafkaConstant.topic_01), >> >> properties , >> >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >> >> >>
Re: Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0
可以尝试升级到 2.5+ Best, Weihua On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg wrote: > 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢 > 在 2022-08-25 18:31:06,"Weihua Hu" 写道: > >kafka 集群的版本是什么呢?看起来是集群版本有点低了 > > > >Best, > >Weihua > > > > > >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: > > > >> 大佬们好: > >> > >> > 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, > >> > >> 异常如下: > >> > >> 2022-08-25 10:42:44 > >> > >> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted > to write a non-default producerId at version 0 > >> > >> 相关代码如下: > >> Properties properties = new Properties(); > >> properties.put("bootstrap.servers", > >> KafkaConstant.bootstrap_servers_01); > >> properties.put("transaction.timeout.ms", 15 * 60 * 1000); > >> FlinkKafkaProducer statsLogV2Producer = new > >> FlinkKafkaProducer<>( > >> KafkaConstant.topic_01, > >> new MyKafkaSerializationSchema(KafkaConstant.topic_01), > >> properties , > >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > >> >
Re:Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0
您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢 在 2022-08-25 18:31:06,"Weihua Hu" 写道: >kafka 集群的版本是什么呢?看起来是集群版本有点低了 > >Best, >Weihua > > >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: > >> 大佬们好: >> >> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, >> >> 异常如下: >> >> 2022-08-25 10:42:44 >> >> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to >> write a non-default producerId at version 0 >> >> 相关代码如下: >> Properties properties = new Properties(); >> properties.put("bootstrap.servers", >> KafkaConstant.bootstrap_servers_01); >> properties.put("transaction.timeout.ms", 15 * 60 * 1000); >> FlinkKafkaProducer statsLogV2Producer = new >> FlinkKafkaProducer<>( >> KafkaConstant.topic_01, >> new MyKafkaSerializationSchema(KafkaConstant.topic_01), >> properties , >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >>
Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0
kafka 集群的版本是什么呢?看起来是集群版本有点低了 Best, Weihua On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: > 大佬们好: > > 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, > > 异常如下: > > 2022-08-25 10:42:44 > > org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to > write a non-default producerId at version 0 > > 相关代码如下: > Properties properties = new Properties(); > properties.put("bootstrap.servers", > KafkaConstant.bootstrap_servers_01); > properties.put("transaction.timeout.ms", 15 * 60 * 1000); > FlinkKafkaProducer statsLogV2Producer = new > FlinkKafkaProducer<>( > KafkaConstant.topic_01, > new MyKafkaSerializationSchema(KafkaConstant.topic_01), > properties , > FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >
Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0
大佬们好: 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, 异常如下: 2022-08-25 10:42:44 org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0 相关代码如下: Properties properties = new Properties(); properties.put("bootstrap.servers", KafkaConstant.bootstrap_servers_01); properties.put("transaction.timeout.ms", 15 * 60 * 1000); FlinkKafkaProducer statsLogV2Producer = new FlinkKafkaProducer<>( KafkaConstant.topic_01, new MyKafkaSerializationSchema(KafkaConstant.topic_01), properties , FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
Re: flink-kafka-sink
hi,没有效果 具体是啥? cxx <1156531...@qq.com> 于2021年1月7日周四 上午9:53写道: > 我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。 > 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。 > 我设置了事务id,隔离级别,client > id,enable.idempotence,max.in.flight.requests.per.connection,retries > 但是没有效果。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, pp
flink-kafka-sink
我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。 我设置了事务id,隔离级别,client id,enable.idempotence,max.in.flight.requests.per.connection,retries 但是没有效果。 -- Sent from: http://apache-flink.147419.n8.nabble.com/