How to get async commit callbacks in the rebalance listener?
Hi, I am trying to do async commits from the rebalance listener, to be precise, in method onPartitionsRevoked. The idea is to wait until all commits for the current epoch are done before the rebalance barrier and by doing so prevent duplicate processing. It is not so hard to call commitAsync(offsets, callback), but what method should be used so that the Kafka client gets a chance to call the callback? I tried the following: *1. Call **commitAsync(Collections.emptyMap, callback)** * Unfortunately when you call commitAsync with an empty offsets map, it doesn't call the callbacks of previous commits. There is a PR from 2020 that would fix this issue: https://github.com/apache/kafka/pull/9111. This PR was closed without merging. Should this PR be reconsidered? *2. Pause all partitions and call **poll(0)* Doesn't work; you'll get a "KafkaConsumer is not safe for multi-threaded access" exception. *3. Call commitSync(**Collections.emptyMap, callback)** * Behaves the same as under point 1. *4. Repeated calls to **commitAsync(offsets, callback)** * This time we keep calling commitAsync with the same offsets until these offsets are committed. Unfortunately, this never ends. Either because commitAsync doesn't call the callbacks, or because this just stacks up more commits to complete. I looked at the other methods on the consumer API but I didn't find anything that looked suitable for this use case. So to repeat the question: What method should I invoke (from the onPartitionsRevoked callback), to make the Kafka client invoke the callback of earlier async commits? Some context: I am working on zio-kafka; a completely async library that provides a concurrent streaming layer on top of the Java client. Thanks, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: MBeans, dashes, underscores, and KAFKA-1481
Hi Jun, The quotes are because of a regression in Metrics 2.2.0. IMHO Metrics 2.2.0 should not be used because of this. Just downgrade to Metrics 2.1.5 and you are good. Of course, upgrading to Metrics 3 would do the trick also. Kind regards, Erik. Jun Rao schreef op 17-10-14 om 20:54: Hi, everyone, We are fixing the mbean names in kafka-1482, by adding separate explicit tags in the name for things like clientId and topic. Another thing that some people have complained before is that we use quotes in the jmx name. Should we also just get rid of the quotes as part of kafka-1482? So, instead of kafka.server:type=BrokerTopicMetrics,name=topic-1-BytesInPerSec we will have kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=topic-1 Thanks, Jun -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/
Re: How to produce and consume events in 2 DCs?
Hi Steven, That doesn't work. In your proposal mirrormaker in once DC would copy messages from topic A to the other DC in topic A. However, in the other DC there is a mirrormaker which does the same, creating a loop. Messages will be duplicated, triplicated, etc in a never ending loop. Mirroring to another topic would work (mirrormaker doesn't support that), and so would mirroring to another cluster. Neha's proposal would work also but I assume its a lot more work for the Kafka internals and therefor IMHO wouldn't meet the kiss principle. Kind regards, Erik. Steven Wu schreef op 22-10-14 om 01:48: I think it doesn't have to be two more clusters. can be just two more topics. MirrorMaker can copy from source topics in both regions into one aggregate topic. On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Thanks Neha, Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to us. Would you accept a pull request on mirror maker that would rename topics on the fly? For example by accepting the parameter rename: —rename src1/dest1,src2/dest2 or, extended with RE support: —rename old_(.*)/new_\1 Kind regards, Erik. Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com het volgende geschreven: Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/ -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/
Re: How to produce and consume events in 2 DCs?
Thanks Neha, Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to us. Would you accept a pull request on mirror maker that would rename topics on the fly? For example by accepting the parameter rename: —rename src1/dest1,src2/dest2 or, extended with RE support: —rename old_(.*)/new_\1 Kind regards, Erik. Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com het volgende geschreven: Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/
Re: refactoring ZK so it is plugable, would this make sense?
You can run with a single node zookeeper cluster also. See http://zookeeper.apache.org/doc/r3.3.4/zookeeperStarted.html#sc_InstallingSingleMode Cheers, Erik. Op 9 okt. 2014, om 22:52 heeft S Ahmed sahmed1...@gmail.com het volgende geschreven: I want kafka features (w/o the redundancy) but don't want to have to run 3 zookeeper instances to save $$. On Thu, Oct 9, 2014 at 2:59 PM, Jun Rao jun...@gmail.com wrote: This may not be easy since you have to implement things like watcher callbacks. What's your main concern with the ZK dependency? Thanks, Jun On Thu, Oct 9, 2014 at 8:20 AM, S Ahmed sahmed1...@gmail.com wrote: Hi, I was wondering if the zookeeper library (zkutils.scala etc) was designed in a more modular way, would it make it possible to run a more lean version of kafka? The idea is I want to run kafka but with a less emphasis on it being durable with failover and more on it being a replacement for a standard queue like kestrel. This way you could take advantage of how the other aspects of Kafka (permanent log, etc etc.) I was just thinking if the zookeeper access was wrapped in something like: class DiscoverService def electLeader .. def getFollower ... (I'm just making those methods up, but you get the point they are simply the same calls zkutils etc. will be making to connect to zookeeper) Now the idea is, if you don't want to dedicate 3 servers to run zookeeper, you could create your own implementation that e.g. returns data based on a configuration file that is static and not a discover service like zookeeper. Would wrapping the zookeper calls into a plugable/swapable service make sense and allow you to still use Kakfa at a smaller scale or would this not work for other reasons that I am overlooking?
How to produce and consume events in 2 DCs?
Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/
Re: running on scala 2.11
Hi Joe, I am afraid your misread the article you are referring too. Scala 2.11 /compiles/ code that compiles with 2.10. Binary compatibility is only guaranteed between micro versions. Kind regards, Erik. Joe Stein schreef op 01-07-14 21:01: Looping back around here (for posterity) I didn't update the ticket but Scala 2.11 works with 2.10 binaries http://www.scala-lang.org/news/2.11.0 If there are issue folks can open a new JIRA please with specific issue as it may not be related, thanks! /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, May 30, 2014 at 3:13 AM, Laszlo Fogas las...@falconsocial.com wrote: cool. thanks, Joe. On Fri, May 30, 2014 at 6:54 AM, Joe Stein joe.st...@stealth.ly wrote: It is possible that when I tried this ticket initially there was something environmental in my build process causing it to appear to work. I reopened the ticket. I will go back through it again on a few different environments and in more detail and post results either way. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Thu, May 29, 2014 at 7:20 AM, Laszlo Fogas las...@falconsocial.com wrote: just simply tried artifact kafka_2.10:0.8.1.1 as suggested in https://issues.apache.org/jira/browse/KAFKA-1454 but got the error written above now I've cloned the repo and compiled with ./gradlew jar = success ./gradlew -PscalaVersion=2.10.0 jar = success ./gradlew -PscalaVersion=2.11.0 jar = FAILURE both on trunk and 0.8.1.1 tag the compile error is the same as here https://github.com/davidB/scala-maven-plugin/issues/145#issuecomment-39189898 i've tried disabling zinc = got classpath errors at an other point and upgrading zinc = had other version incompatibilities Seems like there are issues with scala 2.11 On Wed, May 28, 2014 at 6:41 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Laszlo, Have you built Kafka with scala 2.11? You may read the README file to check compiling Kafka with different scala versions. Guozhang On Wed, May 28, 2014 at 5:45 AM, Laszlo Fogas las...@falconsocial.com wrote: Hello folks, anybody running kafka with scala 2.11.0? KAFKA-1454 says it's possible.. i'm having problems though when running the basic producer example from the wiki The message is *NoClassDefFoundError: scala/collection/GenTraversableOnce$class* Thanks Laszlo -- -- Guozhang -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/
Re: trouble building 0.8
That's a classic: just delete your ~/.ivy2 as well and be prepared to download the internet again :( Erik. Op 06-08-13 05:36, Rob Withers schreef: I deleted all the sbt project and target stuff, for both ~/.sbt and kafka/project/build. I had previously had php stuff in my global sbt stuff. This resolved this issue, but now I am having another… I get the following: [warn] Multiple resolvers having different access mechanism configured with same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project resolvers (`resolvers`) or rename publishing resolver (`publishTo`). I also get this failure to resolve dependency: [info] Resolving org.scalatest#scalatest_2.10;1.8 ... [warn] module not found: org.scalatest#scalatest_2.10;1.8 [warn] local: tried [warn] /Users/reefedjib/.ivy2/local/org.scalatest/scalatest_2.10/1.8/ivys/ivy.xml [warn] SonaType ScalaTest repo: tried [warn] https://oss.sonatype.org/content/groups/public/org/scalatest/org/scalatest/scalatest_2.10/1.8/scalatest_2.10-1.8.pom [warn] public: tried [warn] http://repo1.maven.org/maven2/org/scalatest/scalatest_2.10/1.8/scalatest_2.10-1.8.pom I looked at: http://repo1.maven.org/maven2/org/scalatest/scalatest_2.10 and there is no 1.8, but a 1.9…so, the issue is in core/build.sbt, where scalatest should be 1.9.1. I change it and it works. This was changed today, so perhaps they lost 1.8? thanks, rob On Aug 5, 2013, at 8:19 PM, Rob Withers reefed...@gmail.com wrote: Well, I changed something, as it was working yesterday. Here's my attempt at updating… Robs-MacBook-Pro:kafka reefedjib$ sbt ++2.10.2 update [info] Loading global plugins from /Users/reefedjib/.sbt/plugins [info] Loading project definition from /Users/reefedjib/Desktop/rob/comp/workspace-frameworks/kafka/project [warn] Multiple resolvers having different access mechanism configured with same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project resolvers (`resolvers`) or rename publishing resolver (`publishTo`). [error] AttributeKey ID collisions detected for: 'pgp-signer' (sbt.Task[com.jsuereth.pgp.sbtplugin.PgpSigner], sbt.Task[com.typesafe.sbt.pgp.PgpSigner]), 'pgp-verifier' (sbt.Task[com.jsuereth.pgp.sbtplugin.PgpVerifier], sbt.Task[com.typesafe.sbt.pgp.PgpVerifier]), 'check-pgp-signatures' (sbt.Task[com.typesafe.sbt.pgp.SignatureCheckReport], sbt.Task[com.jsuereth.pgp.sbtplugin.SignatureCheckReport]), 'signatures-module' (sbt.Task[com.typesafe.sbt.pgp.GetSignaturesModule], sbt.Task[com.jsuereth.pgp.sbtplugin.GetSignaturesModule]) [error] Use 'last' for the full log. So here's my /Users/reefedjib/.sbt/plugins: resolvers += Classpaths.typesafeResolver addSbtPlugin(com.typesafe.sbteclipse % sbteclipse-plugin % 2.2.0) and I just clone 0.8, no changes. Is there anywhere else I need to look for sbt plugin configs, outside akka? thanks, rob -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/