Thank you Dana!
I see...
The pity is that Hortonworks claims in their release notes of HDP 2.3.2,
that:
 5.9. Kafka

HDP 2.3.2 provides Kafka 0.8.2, with no additional Apache patches.
(
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_HDP_RelNotes/content/patch_kafka.html
 )

So I assumed that Kafka would come in stable release...

So you say, that upgrading to HDP 2.3.4 would help? I see in release notes,
that it is going to upgrade Kafka to 0.9.0.
I'm affraid of this upgrade as I don't know whether Spark Streaming
(spark-streaming-kafka) will support Kafka in 0.9.

What do you think? Is Kafka 0.9 completely backward compatible? I.e.
clients(both producers & consumers) using libraries for 0.8.2 (both
"kafka-clients" as well as straight "kafka")  connecting to it will work
after upgrade?

Thanks for your answer,
Krzysztof




wt., 19.01.2016 o 18:39 użytkownik Dana Powers <[email protected]>
napisał:

> Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta
> version). You should use the apache releases, or upgrade to HDP 2.3.4.0 or
> later.
>
> -Dana
>
> On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki <[email protected]
> >
> wrote:
>
> > Hi Kafka users,
> > I have an issue with saving Kafka offsets to Zookeeper through
> > OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
> > kindly borrow the description:
> >
> >
> http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper
> >
> > "I've installed Zookeeper and Kafka from Ambari, on CentoS 7.
> >
> > Ambari version: 2.1.2.1
> > Zookeeper version: 3.4.6.2.3
> > Kafka version: 0.8.2.2.3
> > Java Kafka client:kafka_2.10, 0.8.2.2
> >
> > I'm trying to save the Kafka offset, using the following code:
> >
> > SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
> > soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
> > new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition,
> > OffsetAndMetadata> requestInfo = new HashMap<>();
> > requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
> > "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
> > new OffsetCommitRequest(groupName, requestInfo, correlationId,
> > clientName, (short)0);
> > simpleConsumer.commitOffsets(offsetCommitRequest);
> > simpleConsumer.close();
> >
> > But when I run this, I get the following error in my client:
> >
> > java.io.EOFException: Received -1 when reading from channel, socket
> > has likely been closed.
> >
> > Also in the Kafka logs I have the following error:
> >
> > [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
> > because of error (kafka.network.Processor)
> > java.nio.BufferUnderflowException
> >     at java.nio.Buffer.nextGetIndex(Buffer.java:498)
> >     at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
> >     at
> >
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
> >     at
> >
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at scala.collection.immutable.Range.foreach(Range.scala:141)
> >     at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> >     at
> >
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
> >     at
> >
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >     at scala.collection.immutable.Range.foreach(Range.scala:141)
> >     at
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> >     at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >     at
> > kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
> >     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
> >     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
> >     at
> kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
> >     at kafka.network.Processor.read(SocketServer.scala:547)
> >     at kafka.network.Processor.run(SocketServer.scala:405)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> > Now I've also downloaded and installed the official Kafka 0.8.2.2 version
> > from
> >
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
> > and
> > it works ok; you can save the Kafka offset without any error.
> >
> > Can anybody give me a some directions, why is the Ambari Kafka failing to
> > save the offset?
> >
> > P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the
> > offset is actually saved in Zookeeper.
> > "
> > My only difference (IMHO, irrelevant)  is that I'm using HDP in version
> > 2.3.2, but other than that versions are the same.
> >
> > Do you guys have any hints on what could be wrong? Is that something
> wrong
> > with my use of offset committing? Or conflict of versions?
> > Any hints would be highly appreciated :)
> > Cheers,
> > Krzysztof
> >
>

Reply via email to