Thank you Dana! I see... The pity is that Hortonworks claims in their release notes of HDP 2.3.2, that: 5.9. Kafka
HDP 2.3.2 provides Kafka 0.8.2, with no additional Apache patches. ( https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_HDP_RelNotes/content/patch_kafka.html ) So I assumed that Kafka would come in stable release... So you say, that upgrading to HDP 2.3.4 would help? I see in release notes, that it is going to upgrade Kafka to 0.9.0. I'm affraid of this upgrade as I don't know whether Spark Streaming (spark-streaming-kafka) will support Kafka in 0.9. What do you think? Is Kafka 0.9 completely backward compatible? I.e. clients(both producers & consumers) using libraries for 0.8.2 (both "kafka-clients" as well as straight "kafka") connecting to it will work after upgrade? Thanks for your answer, Krzysztof wt., 19.01.2016 o 18:39 użytkownik Dana Powers <[email protected]> napisał: > Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta > version). You should use the apache releases, or upgrade to HDP 2.3.4.0 or > later. > > -Dana > > On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki <[email protected] > > > wrote: > > > Hi Kafka users, > > I have an issue with saving Kafka offsets to Zookeeper through > > OffsetCommitRequest. It's the same issue I found unanswered on SO, so I > > kindly borrow the description: > > > > > http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper > > > > "I've installed Zookeeper and Kafka from Ambari, on CentoS 7. > > > > Ambari version: 2.1.2.1 > > Zookeeper version: 3.4.6.2.3 > > Kafka version: 0.8.2.2.3 > > Java Kafka client:kafka_2.10, 0.8.2.2 > > > > I'm trying to save the Kafka offset, using the following code: > > > > SimpleConsumer simpleConsumer = new SimpleConsumer(host, port, > > soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition = > > new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition, > > OffsetAndMetadata> requestInfo = new HashMap<>(); > > requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset, > > "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest = > > new OffsetCommitRequest(groupName, requestInfo, correlationId, > > clientName, (short)0); > > simpleConsumer.commitOffsets(offsetCommitRequest); > > simpleConsumer.close(); > > > > But when I run this, I get the following error in my client: > > > > java.io.EOFException: Received -1 when reading from channel, socket > > has likely been closed. > > > > Also in the Kafka logs I have the following error: > > > > [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1 > > because of error (kafka.network.Processor) > > java.nio.BufferUnderflowException > > at java.nio.Buffer.nextGetIndex(Buffer.java:498) > > at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406) > > at > > > kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73) > > at > > > kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68) > > at > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at scala.collection.immutable.Range.foreach(Range.scala:141) > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > > at > > > kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68) > > at > > > kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65) > > at > > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > at > > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > at scala.collection.immutable.Range.foreach(Range.scala:141) > > at > > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > > at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > > at > > kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65) > > at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) > > at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) > > at > kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55) > > at kafka.network.Processor.read(SocketServer.scala:547) > > at kafka.network.Processor.run(SocketServer.scala:405) > > at java.lang.Thread.run(Thread.java:745) > > > > Now I've also downloaded and installed the official Kafka 0.8.2.2 version > > from > > > > > https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz > > and > > it works ok; you can save the Kafka offset without any error. > > > > Can anybody give me a some directions, why is the Ambari Kafka failing to > > save the offset? > > > > P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the > > offset is actually saved in Zookeeper. > > " > > My only difference (IMHO, irrelevant) is that I'm using HDP in version > > 2.3.2, but other than that versions are the same. > > > > Do you guys have any hints on what could be wrong? Is that something > wrong > > with my use of offset committing? Or conflict of versions? > > Any hints would be highly appreciated :) > > Cheers, > > Krzysztof > > >
