How to get async commit callbacks in the rebalance listener?

2023-04-26 Thread Erik van Oosten

Hi,

I am trying to do async commits from the rebalance listener, to be 
precise, in method onPartitionsRevoked. The idea is to wait until all 
commits for the current epoch are done before the rebalance barrier and 
by doing so prevent duplicate processing.


It is not so hard to call commitAsync(offsets, callback), but what 
method should be used so that the Kafka client gets a chance to call the 
callback?


I tried the following:

*1. Call **commitAsync(Collections.emptyMap, callback)**
*

Unfortunately when you call commitAsync with an empty offsets map, it 
doesn't call the callbacks of previous commits.


There is a PR from 2020 that would fix this issue: 
https://github.com/apache/kafka/pull/9111. This PR was closed without 
merging. Should this PR be reconsidered?


*2. Pause all partitions and call **poll(0)*

Doesn't work; you'll get a "KafkaConsumer is not safe for multi-threaded 
access" exception.


*3. Call commitSync(**Collections.emptyMap, callback)**
*

Behaves the same as under point 1.

*4. Repeated calls to **commitAsync(offsets, callback)**
*

This time we keep calling commitAsync with the same offsets until these 
offsets are committed. Unfortunately, this never ends. Either because 
commitAsync doesn't call the callbacks, or because this just stacks up 
more commits to complete.


I looked at the other methods on the consumer API but I didn't find 
anything that looked suitable for this use case.



So to repeat the question:

What method should I invoke (from the onPartitionsRevoked callback), to 
make the Kafka client invoke the callback of earlier async commits?



Some context: I am working on zio-kafka; a completely async library that 
provides a concurrent streaming layer on top of the Java client.


Thanks,
    Erik.


--
Erik van Oosten
e.vanoos...@grons.nl
https://day-to-day-stuff.blogspot.com


Re: MBeans, dashes, underscores, and KAFKA-1481

2014-11-02 Thread Erik van Oosten

Hi Jun,

The quotes are because of a regression in Metrics 2.2.0. IMHO Metrics 
2.2.0 should not be used because of this. Just downgrade to Metrics 
2.1.5 and you are good.


Of course, upgrading to Metrics 3 would do the trick also.

Kind regards,
Erik.


Jun Rao schreef op 17-10-14 om 20:54:

Hi, everyone,

We are fixing the mbean names in kafka-1482, by adding separate explicit
tags in the name for things like clientId and topic. Another thing that
some people have complained before is that we use quotes in the jmx name.
Should we also just get rid of the quotes as part of kafka-1482? So,
instead of
kafka.server:type=BrokerTopicMetrics,name=topic-1-BytesInPerSec
we will have
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=topic-1

Thanks,

Jun




--
Erik van Oosten
http://www.day-to-day-stuff.blogspot.com/



Re: How to produce and consume events in 2 DCs?

2014-10-22 Thread Erik van Oosten

Hi Steven,

That doesn't work. In your proposal mirrormaker in once DC would copy 
messages from topic A to the other DC in topic A. However, in the other 
DC there is a mirrormaker which does the same, creating a loop. Messages 
will be duplicated, triplicated, etc in a never ending loop.


Mirroring to another topic would work (mirrormaker doesn't support 
that), and so would mirroring to another cluster. Neha's proposal would 
work also but I assume its a lot more work for the Kafka internals and 
therefor IMHO wouldn't meet the kiss principle.


Kind regards,
Erik.


Steven Wu schreef op 22-10-14 om 01:48:

I think it doesn't have to be two more clusters. can be just two more
topics. MirrorMaker can copy from source topics in both regions into one
aggregate topic.

On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten 
e.vanoos...@grons.nl.invalid wrote:


Thanks Neha,

Unfortunately, the maintenance overhead of 2 more clusters is not
acceptable to us.

Would you accept a pull request on mirror maker that would rename topics
on the fly?

For example by accepting the parameter rename:
—rename src1/dest1,src2/dest2
or, extended with RE support:
—rename old_(.*)/new_\1

Kind regards,
 Erik.


Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com
het volgende geschreven:


Another way to set up this kind of mirroring is by deploying 2 clusters

in

each DC - a local Kafka cluster and an aggregate Kafka cluster. The

mirror

maker copies data from both the DC's local clusters into the aggregate
clusters. So if you want access to a topic with data from both DC's, you
subscribe to the aggregate cluster.

Thanks,
Neha

On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten 
e.vanoos...@grons.nl.invalid wrote:


Hi,

We have 2 data centers that produce events. Each DC has to process

events

from both DCs.

I had the following in mind:

   DC 1 | DC 2
events  |events
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
++ | ++
| Receiver topic | | | Receiver topic |
++   ++
 |  |   mirroring  ||
 |  |   +--+|
 |  |   |   |
 |  ++  |
 v  vv  v
++ | ++
| Consumer topic | | | Consumer topic |
++ | ++
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
  consumers |  consumers


As each DC has a single Kafka cluster, on each DC the receiver topic and
consumer topic needs to be on the same cluster.
Unfortunately, mirror maker does not seem to support mirroring to a

topic

with another name.

Is there another tool we could use?
Or, is there another approach for producing and consuming from 2 DCs?

Kind regards,
Erik.

—
Erik van Oosten
http://www.day-to-day-stuff.blogspot.nl/







--
Erik van Oosten
http://www.day-to-day-stuff.blogspot.com/



Re: How to produce and consume events in 2 DCs?

2014-10-21 Thread Erik van oosten
Thanks Neha,

Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to 
us.

Would you accept a pull request on mirror maker that would rename topics on the 
fly?

For example by accepting the parameter rename:
   —rename src1/dest1,src2/dest2
or, extended with RE support:
   —rename old_(.*)/new_\1

Kind regards,
Erik.


Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com het 
volgende geschreven:

 Another way to set up this kind of mirroring is by deploying 2 clusters in
 each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror
 maker copies data from both the DC's local clusters into the aggregate
 clusters. So if you want access to a topic with data from both DC's, you
 subscribe to the aggregate cluster.
 
 Thanks,
 Neha
 
 On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten 
 e.vanoos...@grons.nl.invalid wrote:
 
 Hi,
 
 We have 2 data centers that produce events. Each DC has to process events
 from both DCs.
 
 I had the following in mind:
 
   DC 1 | DC 2
events  |events
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
 ++ | ++
 | Receiver topic | | | Receiver topic |
 ++   ++
 |  |   mirroring  ||
 |  |   +--+|
 |  |   |   |
 |  ++  |
 v  vv  v
 ++ | ++
 | Consumer topic | | | Consumer topic |
 ++ | ++
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
  consumers |  consumers
 
 
 As each DC has a single Kafka cluster, on each DC the receiver topic and
 consumer topic needs to be on the same cluster.
 Unfortunately, mirror maker does not seem to support mirroring to a topic
 with another name.
 
 Is there another tool we could use?
 Or, is there another approach for producing and consuming from 2 DCs?
 
 Kind regards,
Erik.
 
 —
 Erik van Oosten
 http://www.day-to-day-stuff.blogspot.nl/
 
 



Re: refactoring ZK so it is plugable, would this make sense?

2014-10-20 Thread Erik van oosten
You can run with a single node zookeeper cluster also.
See 
http://zookeeper.apache.org/doc/r3.3.4/zookeeperStarted.html#sc_InstallingSingleMode

Cheers,
Erik.

Op 9 okt. 2014, om 22:52 heeft S Ahmed sahmed1...@gmail.com het volgende 
geschreven:

 I want kafka features (w/o the redundancy) but don't want to have to run 3
 zookeeper instances to save $$.
 
 On Thu, Oct 9, 2014 at 2:59 PM, Jun Rao jun...@gmail.com wrote:
 
 This may not be easy since you have to implement things like watcher
 callbacks. What's your main concern with the ZK dependency?
 
 Thanks,
 
 Jun
 
 On Thu, Oct 9, 2014 at 8:20 AM, S Ahmed sahmed1...@gmail.com wrote:
 
 Hi,
 
 I was wondering if the zookeeper library (zkutils.scala etc) was designed
 in a more modular way, would it make it possible to run a more lean
 version of kafka?
 
 The idea is I want to run kafka but with a less emphasis on it being
 durable with failover and more on it being a replacement for a standard
 queue like kestrel.
 
 This way you could take advantage of how the other aspects of Kafka
 (permanent log, etc etc.)
 
 I was just thinking if the zookeeper access was wrapped in something
 like:
 
 class DiscoverService
 
   def electLeader ..
   def getFollower ...
 
 (I'm just making those methods up, but you get the point they are simply
 the same calls zkutils etc. will be making to connect to zookeeper)
 
 Now the idea is, if you don't want to dedicate 3 servers to run
 zookeeper,
 you could create your own implementation that e.g. returns data based on
 a
 configuration file that is static and not a discover service like
 zookeeper.
 
 Would wrapping the zookeper calls into a plugable/swapable service make
 sense and allow you to still use Kakfa at a smaller scale or would this
 not
 work for other reasons that I am overlooking?
 
 



How to produce and consume events in 2 DCs?

2014-10-20 Thread Erik van oosten
Hi,

We have 2 data centers that produce events. Each DC has to process events from 
both DCs.

I had the following in mind:

   DC 1 | DC 2
events  |events
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
 ++ | ++
 | Receiver topic | | | Receiver topic |
 ++   ++
 |  |   mirroring  ||
 |  |   +--+|
 |  |   |   |
 |  ++  |
 v  vv  v
 ++ | ++
 | Consumer topic | | | Consumer topic |
 ++ | ++
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
  consumers |  consumers


As each DC has a single Kafka cluster, on each DC the receiver topic and 
consumer topic needs to be on the same cluster.
Unfortunately, mirror maker does not seem to support mirroring to a topic with 
another name. 

Is there another tool we could use?
Or, is there another approach for producing and consuming from 2 DCs?

Kind regards,
Erik.

—
Erik van Oosten
http://www.day-to-day-stuff.blogspot.nl/



Re: running on scala 2.11

2014-07-05 Thread Erik van Oosten

Hi Joe,

I am afraid your misread the article you are referring too. Scala 2.11 
/compiles/ code that compiles with 2.10. Binary compatibility is only 
guaranteed between micro versions.


Kind regards,
Erik.


Joe Stein schreef op 01-07-14 21:01:

Looping back around here (for posterity) I didn't update the ticket but
Scala 2.11 works with 2.10 binaries http://www.scala-lang.org/news/2.11.0

If there are issue folks can open a new JIRA please with specific issue as
it may not be related, thanks!

/***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Fri, May 30, 2014 at 3:13 AM, Laszlo Fogas las...@falconsocial.com
wrote:


cool.

thanks, Joe.


On Fri, May 30, 2014 at 6:54 AM, Joe Stein joe.st...@stealth.ly wrote:


It is possible that when I tried this ticket initially there was

something

environmental in my build process causing it to appear to work.

I reopened the ticket.

I will go back through it again on a few different environments and in

more

detail and post results either way.

/***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Thu, May 29, 2014 at 7:20 AM, Laszlo Fogas las...@falconsocial.com
wrote:


just simply tried artifact kafka_2.10:0.8.1.1 as suggested in
https://issues.apache.org/jira/browse/KAFKA-1454 but got the error

written

above


now I've cloned the repo and compiled with

./gradlew jar = success
./gradlew -PscalaVersion=2.10.0 jar = success
./gradlew -PscalaVersion=2.11.0 jar =  FAILURE

both on trunk and 0.8.1.1 tag


the compile error is the same as here



https://github.com/davidB/scala-maven-plugin/issues/145#issuecomment-39189898

i've tried disabling zinc = got classpath errors at an other point
and upgrading zinc = had other version incompatibilities


Seems like there are issues with scala 2.11









On Wed, May 28, 2014 at 6:41 PM, Guozhang Wang wangg...@gmail.com

wrote:

Hello Laszlo,

Have you built Kafka with scala 2.11? You may read the README file to

check

compiling Kafka with different scala versions.

Guozhang


On Wed, May 28, 2014 at 5:45 AM, Laszlo Fogas 

las...@falconsocial.com

wrote:


Hello folks,

anybody running kafka with scala 2.11.0?

KAFKA-1454 says it's possible.. i'm having problems though when

running

the

basic producer example from the wiki

The message is *NoClassDefFoundError:
scala/collection/GenTraversableOnce$class*


Thanks

Laszlo




--
-- Guozhang




--
Erik van Oosten
http://www.day-to-day-stuff.blogspot.com/



Re: trouble building 0.8

2013-08-12 Thread Erik van Oosten
That's a classic: just delete your ~/.ivy2 as well and be prepared to 
download the internet again :(


Erik.

Op 06-08-13 05:36, Rob Withers schreef:

I deleted all the sbt project and target stuff, for both ~/.sbt and 
kafka/project/build.  I had previously had php stuff in my global sbt stuff.  
This resolved this issue, but now I am having another…

I get the following:

[warn] Multiple resolvers having different access mechanism configured with 
same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project 
resolvers (`resolvers`) or rename publishing resolver (`publishTo`).

I also get this failure to resolve dependency:

[info] Resolving org.scalatest#scalatest_2.10;1.8 ...
[warn]  module not found: org.scalatest#scalatest_2.10;1.8
[warn]  local: tried
[warn]   
/Users/reefedjib/.ivy2/local/org.scalatest/scalatest_2.10/1.8/ivys/ivy.xml
[warn]  SonaType ScalaTest repo: tried
[warn]   
https://oss.sonatype.org/content/groups/public/org/scalatest/org/scalatest/scalatest_2.10/1.8/scalatest_2.10-1.8.pom
[warn]  public: tried
[warn]   
http://repo1.maven.org/maven2/org/scalatest/scalatest_2.10/1.8/scalatest_2.10-1.8.pom

I looked at:

http://repo1.maven.org/maven2/org/scalatest/scalatest_2.10

and there is no 1.8, but a 1.9…so, the issue is in core/build.sbt, where scalatest should 
be 1.9.1.  I change it and it works.  This was changed today, so perhaps they 
lost 1.8?

thanks,
rob

On Aug 5, 2013, at 8:19 PM, Rob Withers reefed...@gmail.com wrote:


Well, I changed something, as it was working yesterday.   Here's my attempt at 
updating…

Robs-MacBook-Pro:kafka reefedjib$ sbt ++2.10.2 update
[info] Loading global plugins from /Users/reefedjib/.sbt/plugins
[info] Loading project definition from 
/Users/reefedjib/Desktop/rob/comp/workspace-frameworks/kafka/project
[warn] Multiple resolvers having different access mechanism configured with 
same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project 
resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
[error] AttributeKey ID collisions detected for: 'pgp-signer' 
(sbt.Task[com.jsuereth.pgp.sbtplugin.PgpSigner], 
sbt.Task[com.typesafe.sbt.pgp.PgpSigner]), 'pgp-verifier' 
(sbt.Task[com.jsuereth.pgp.sbtplugin.PgpVerifier], 
sbt.Task[com.typesafe.sbt.pgp.PgpVerifier]), 'check-pgp-signatures' 
(sbt.Task[com.typesafe.sbt.pgp.SignatureCheckReport], 
sbt.Task[com.jsuereth.pgp.sbtplugin.SignatureCheckReport]), 'signatures-module' 
(sbt.Task[com.typesafe.sbt.pgp.GetSignaturesModule], 
sbt.Task[com.jsuereth.pgp.sbtplugin.GetSignaturesModule])
[error] Use 'last' for the full log.

So here's my /Users/reefedjib/.sbt/plugins:

resolvers += Classpaths.typesafeResolver
addSbtPlugin(com.typesafe.sbteclipse % sbteclipse-plugin % 2.2.0)

and I just clone 0.8, no changes.

Is there anywhere else I need to look for sbt plugin configs, outside akka?

thanks,
rob





--
Erik van Oosten
http://www.day-to-day-stuff.blogspot.com/