How to get async commit callbacks in the rebalance listener?

2023-04-26 Thread Erik van Oosten


I am trying to do async commits from the rebalance listener, to be 
precise, in method onPartitionsRevoked. The idea is to wait until all 
commits for the current epoch are done before the rebalance barrier and 
by doing so prevent duplicate processing.

It is not so hard to call commitAsync(offsets, callback), but what 
method should be used so that the Kafka client gets a chance to call the 

I tried the following:

*1. Call **commitAsync(Collections.emptyMap, callback)**

Unfortunately when you call commitAsync with an empty offsets map, it 
doesn't call the callbacks of previous commits.

There is a PR from 2020 that would fix this issue: This PR was closed without 
merging. Should this PR be reconsidered?

*2. Pause all partitions and call **poll(0)*

Doesn't work; you'll get a "KafkaConsumer is not safe for multi-threaded 
access" exception.

*3. Call commitSync(**Collections.emptyMap, callback)**

Behaves the same as under point 1.

*4. Repeated calls to **commitAsync(offsets, callback)**

This time we keep calling commitAsync with the same offsets until these 
offsets are committed. Unfortunately, this never ends. Either because 
commitAsync doesn't call the callbacks, or because this just stacks up 
more commits to complete.

I looked at the other methods on the consumer API but I didn't find 
anything that looked suitable for this use case.

So to repeat the question:

What method should I invoke (from the onPartitionsRevoked callback), to 
make the Kafka client invoke the callback of earlier async commits?

Some context: I am working on zio-kafka; a completely async library that 
provides a concurrent streaming layer on top of the Java client.


Erik van Oosten

Re: MBeans, dashes, underscores, and KAFKA-1481

2014-11-02 Thread Erik van Oosten

Hi Jun,

The quotes are because of a regression in Metrics 2.2.0. IMHO Metrics 
2.2.0 should not be used because of this. Just downgrade to Metrics 
2.1.5 and you are good.

Of course, upgrading to Metrics 3 would do the trick also.

Kind regards,

Jun Rao schreef op 17-10-14 om 20:54:

Hi, everyone,

We are fixing the mbean names in kafka-1482, by adding separate explicit
tags in the name for things like clientId and topic. Another thing that
some people have complained before is that we use quotes in the jmx name.
Should we also just get rid of the quotes as part of kafka-1482? So,
instead of
we will have



Erik van Oosten

Re: How to produce and consume events in 2 DCs?

2014-10-22 Thread Erik van Oosten

Hi Steven,

That doesn't work. In your proposal mirrormaker in once DC would copy 
messages from topic A to the other DC in topic A. However, in the other 
DC there is a mirrormaker which does the same, creating a loop. Messages 
will be duplicated, triplicated, etc in a never ending loop.

Mirroring to another topic would work (mirrormaker doesn't support 
that), and so would mirroring to another cluster. Neha's proposal would 
work also but I assume its a lot more work for the Kafka internals and 
therefor IMHO wouldn't meet the kiss principle.

Kind regards,

Steven Wu schreef op 22-10-14 om 01:48:

I think it doesn't have to be two more clusters. can be just two more
topics. MirrorMaker can copy from source topics in both regions into one
aggregate topic.

On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten wrote:

Thanks Neha,

Unfortunately, the maintenance overhead of 2 more clusters is not
acceptable to us.

Would you accept a pull request on mirror maker that would rename topics
on the fly?

For example by accepting the parameter rename:
—rename src1/dest1,src2/dest2
or, extended with RE support:
—rename old_(.*)/new_\1

Kind regards,

Op 20 okt. 2014, om 16:43 heeft Neha Narkhede
het volgende geschreven:

Another way to set up this kind of mirroring is by deploying 2 clusters


each DC - a local Kafka cluster and an aggregate Kafka cluster. The


maker copies data from both the DC's local clusters into the aggregate
clusters. So if you want access to a topic with data from both DC's, you
subscribe to the aggregate cluster.


On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten wrote:


We have 2 data centers that produce events. Each DC has to process


from both DCs.

I had the following in mind:

   DC 1 | DC 2
events  |events
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
++ | ++
| Receiver topic | | | Receiver topic |
++   ++
 |  |   mirroring  ||
 |  |   +--+|
 |  |   |   |
 |  ++  |
 v  vv  v
++ | ++
| Consumer topic | | | Consumer topic |
++ | ++
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
  consumers |  consumers

As each DC has a single Kafka cluster, on each DC the receiver topic and
consumer topic needs to be on the same cluster.
Unfortunately, mirror maker does not seem to support mirroring to a


with another name.

Is there another tool we could use?
Or, is there another approach for producing and consuming from 2 DCs?

Kind regards,

Erik van Oosten

Erik van Oosten

Re: How to produce and consume events in 2 DCs?

2014-10-21 Thread Erik van oosten
Thanks Neha,

Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to 

Would you accept a pull request on mirror maker that would rename topics on the 

For example by accepting the parameter rename:
   —rename src1/dest1,src2/dest2
or, extended with RE support:
   —rename old_(.*)/new_\1

Kind regards,

Op 20 okt. 2014, om 16:43 heeft Neha Narkhede het 
volgende geschreven:

 Another way to set up this kind of mirroring is by deploying 2 clusters in
 each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror
 maker copies data from both the DC's local clusters into the aggregate
 clusters. So if you want access to a topic with data from both DC's, you
 subscribe to the aggregate cluster.
 On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten wrote:
 We have 2 data centers that produce events. Each DC has to process events
 from both DCs.
 I had the following in mind:
   DC 1 | DC 2
events  |events
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
 ++ | ++
 | Receiver topic | | | Receiver topic |
 ++   ++
 |  |   mirroring  ||
 |  |   +--+|
 |  |   |   |
 |  ++  |
 v  vv  v
 ++ | ++
 | Consumer topic | | | Consumer topic |
 ++ | ++
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
  consumers |  consumers
 As each DC has a single Kafka cluster, on each DC the receiver topic and
 consumer topic needs to be on the same cluster.
 Unfortunately, mirror maker does not seem to support mirroring to a topic
 with another name.
 Is there another tool we could use?
 Or, is there another approach for producing and consuming from 2 DCs?
 Kind regards,
 Erik van Oosten

Re: refactoring ZK so it is plugable, would this make sense?

2014-10-20 Thread Erik van oosten
You can run with a single node zookeeper cluster also.


Op 9 okt. 2014, om 22:52 heeft S Ahmed het volgende 

 I want kafka features (w/o the redundancy) but don't want to have to run 3
 zookeeper instances to save $$.
 On Thu, Oct 9, 2014 at 2:59 PM, Jun Rao wrote:
 This may not be easy since you have to implement things like watcher
 callbacks. What's your main concern with the ZK dependency?
 On Thu, Oct 9, 2014 at 8:20 AM, S Ahmed wrote:
 I was wondering if the zookeeper library (zkutils.scala etc) was designed
 in a more modular way, would it make it possible to run a more lean
 version of kafka?
 The idea is I want to run kafka but with a less emphasis on it being
 durable with failover and more on it being a replacement for a standard
 queue like kestrel.
 This way you could take advantage of how the other aspects of Kafka
 (permanent log, etc etc.)
 I was just thinking if the zookeeper access was wrapped in something
 class DiscoverService
   def electLeader ..
   def getFollower ...
 (I'm just making those methods up, but you get the point they are simply
 the same calls zkutils etc. will be making to connect to zookeeper)
 Now the idea is, if you don't want to dedicate 3 servers to run
 you could create your own implementation that e.g. returns data based on
 configuration file that is static and not a discover service like
 Would wrapping the zookeper calls into a plugable/swapable service make
 sense and allow you to still use Kakfa at a smaller scale or would this
 work for other reasons that I am overlooking?

How to produce and consume events in 2 DCs?

2014-10-20 Thread Erik van oosten

We have 2 data centers that produce events. Each DC has to process events from 
both DCs.

I had the following in mind:

   DC 1 | DC 2
events  |events
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
 ++ | ++
 | Receiver topic | | | Receiver topic |
 ++   ++
 |  |   mirroring  ||
 |  |   +--+|
 |  |   |   |
 |  ++  |
 v  vv  v
 ++ | ++
 | Consumer topic | | | Consumer topic |
 ++ | ++
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
  consumers |  consumers

As each DC has a single Kafka cluster, on each DC the receiver topic and 
consumer topic needs to be on the same cluster.
Unfortunately, mirror maker does not seem to support mirroring to a topic with 
another name. 

Is there another tool we could use?
Or, is there another approach for producing and consuming from 2 DCs?

Kind regards,

Erik van Oosten

Re: running on scala 2.11

2014-07-05 Thread Erik van Oosten

Hi Joe,

I am afraid your misread the article you are referring too. Scala 2.11 
/compiles/ code that compiles with 2.10. Binary compatibility is only 
guaranteed between micro versions.

Kind regards,

Joe Stein schreef op 01-07-14 21:01:

Looping back around here (for posterity) I didn't update the ticket but
Scala 2.11 works with 2.10 binaries

If there are issue folks can open a new JIRA please with specific issue as
it may not be related, thanks!

  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  Twitter: @allthingshadoop

On Fri, May 30, 2014 at 3:13 AM, Laszlo Fogas


thanks, Joe.

On Fri, May 30, 2014 at 6:54 AM, Joe Stein wrote:

It is possible that when I tried this ticket initially there was


environmental in my build process causing it to appear to work.

I reopened the ticket.

I will go back through it again on a few different environments and in


detail and post results either way.

  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  Twitter: @allthingshadoop

On Thu, May 29, 2014 at 7:20 AM, Laszlo Fogas

just simply tried artifact kafka_2.10: as suggested in but got the error



now I've cloned the repo and compiled with

./gradlew jar = success
./gradlew -PscalaVersion=2.10.0 jar = success
./gradlew -PscalaVersion=2.11.0 jar =  FAILURE

both on trunk and tag

the compile error is the same as here

i've tried disabling zinc = got classpath errors at an other point
and upgrading zinc = had other version incompatibilities

Seems like there are issues with scala 2.11

On Wed, May 28, 2014 at 6:41 PM, Guozhang Wang


Hello Laszlo,

Have you built Kafka with scala 2.11? You may read the README file to


compiling Kafka with different scala versions.


On Wed, May 28, 2014 at 5:45 AM, Laszlo Fogas


Hello folks,

anybody running kafka with scala 2.11.0?

KAFKA-1454 says it's possible.. i'm having problems though when



basic producer example from the wiki

The message is *NoClassDefFoundError:



-- Guozhang

Erik van Oosten

Re: trouble building 0.8

2013-08-12 Thread Erik van Oosten
That's a classic: just delete your ~/.ivy2 as well and be prepared to 
download the internet again :(


Op 06-08-13 05:36, Rob Withers schreef:

I deleted all the sbt project and target stuff, for both ~/.sbt and 
kafka/project/build.  I had previously had php stuff in my global sbt stuff.  
This resolved this issue, but now I am having another…

I get the following:

[warn] Multiple resolvers having different access mechanism configured with 
same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project 
resolvers (`resolvers`) or rename publishing resolver (`publishTo`).

I also get this failure to resolve dependency:

[info] Resolving org.scalatest#scalatest_2.10;1.8 ...
[warn]  module not found: org.scalatest#scalatest_2.10;1.8
[warn]  local: tried
[warn]  SonaType ScalaTest repo: tried
[warn]  public: tried

I looked at:

and there is no 1.8, but a 1.9…so, the issue is in core/build.sbt, where scalatest should 
be 1.9.1.  I change it and it works.  This was changed today, so perhaps they 
lost 1.8?


On Aug 5, 2013, at 8:19 PM, Rob Withers wrote:

Well, I changed something, as it was working yesterday.   Here's my attempt at 

Robs-MacBook-Pro:kafka reefedjib$ sbt ++2.10.2 update
[info] Loading global plugins from /Users/reefedjib/.sbt/plugins
[info] Loading project definition from 
[warn] Multiple resolvers having different access mechanism configured with 
same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project 
resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
[error] AttributeKey ID collisions detected for: 'pgp-signer' 
sbt.Task[com.typesafe.sbt.pgp.PgpSigner]), 'pgp-verifier' 
sbt.Task[com.typesafe.sbt.pgp.PgpVerifier]), 'check-pgp-signatures' 
sbt.Task[com.jsuereth.pgp.sbtplugin.SignatureCheckReport]), 'signatures-module' 
[error] Use 'last' for the full log.

So here's my /Users/reefedjib/.sbt/plugins:

resolvers += Classpaths.typesafeResolver
addSbtPlugin(com.typesafe.sbteclipse % sbteclipse-plugin % 2.2.0)

and I just clone 0.8, no changes.

Is there anywhere else I need to look for sbt plugin configs, outside akka?


Erik van Oosten