Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta version). You should use the apache releases, or upgrade to HDP 2.3.4.0 or later.
-Dana On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki <[email protected]> wrote: > Hi Kafka users, > I have an issue with saving Kafka offsets to Zookeeper through > OffsetCommitRequest. It's the same issue I found unanswered on SO, so I > kindly borrow the description: > > http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper > > "I've installed Zookeeper and Kafka from Ambari, on CentoS 7. > > Ambari version: 2.1.2.1 > Zookeeper version: 3.4.6.2.3 > Kafka version: 0.8.2.2.3 > Java Kafka client:kafka_2.10, 0.8.2.2 > > I'm trying to save the Kafka offset, using the following code: > > SimpleConsumer simpleConsumer = new SimpleConsumer(host, port, > soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition = > new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition, > OffsetAndMetadata> requestInfo = new HashMap<>(); > requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset, > "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest = > new OffsetCommitRequest(groupName, requestInfo, correlationId, > clientName, (short)0); > simpleConsumer.commitOffsets(offsetCommitRequest); > simpleConsumer.close(); > > But when I run this, I get the following error in my client: > > java.io.EOFException: Received -1 when reading from channel, socket > has likely been closed. > > Also in the Kafka logs I have the following error: > > [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1 > because of error (kafka.network.Processor) > java.nio.BufferUnderflowException > at java.nio.Buffer.nextGetIndex(Buffer.java:498) > at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406) > at > kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73) > at > kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.Range.foreach(Range.scala:141) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68) > at > kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.immutable.Range.foreach(Range.scala:141) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65) > at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) > at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) > at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55) > at kafka.network.Processor.read(SocketServer.scala:547) > at kafka.network.Processor.run(SocketServer.scala:405) > at java.lang.Thread.run(Thread.java:745) > > Now I've also downloaded and installed the official Kafka 0.8.2.2 version > from > > https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz > and > it works ok; you can save the Kafka offset without any error. > > Can anybody give me a some directions, why is the Ambari Kafka failing to > save the offset? > > P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the > offset is actually saved in Zookeeper. > " > My only difference (IMHO, irrelevant) is that I'm using HDP in version > 2.3.2, but other than that versions are the same. > > Do you guys have any hints on what could be wrong? Is that something wrong > with my use of offset committing? Or conflict of versions? > Any hints would be highly appreciated :) > Cheers, > Krzysztof >
