RE: Per Topic Metrics
I want to say that the metrics only show up when the first message comes in, but I could be thinking of another tool. Try sending a message to the broker and see if metrics appear? t. -Original Message- From: Wollert, Fabian [mailto:fabian.woll...@zalando.de] Sent: Thursday, December 10, 2015 10:14 To: users@kafka.apache.org Subject: Per Topic Metrics Hi everyone, when i browse via jconsole through a freshly started kafka 0.8.2.1 JVM i can only find metrics for the whole broker (like MessagesInPerSec in kafka.server:name=MessagesInPerSec,type=BrokerTopicMetrics") but not per topic. what am i missing, e.g. where can i find those metrics? the documentation ( https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-ServerStats) says kafka:type=kafka.BrokerAllTopicStat.[topic] but this is not exisiting for me, i guess this is related to 0.7?? Cheers -- *Fabian Wollert* Business Intelligence *Zalando SE* *POSTAL ADDRESS* Zalando SE 11501 Berlin *OFFICE* Zalando SE Mollstraße 1 10178 Berlin Germany Phone: +49 30 20968 1819 Fax: +49 30 27594 693 E-Mail: fabian.woll...@zalando.de Web: www.zalando.de Jobs: jobs.zalando.de Zalando SE, Tamara-Danz-Straße 1, 10243 Berlin Company registration: Amtsgericht Charlottenburg, HRB 158855 B Tax ID: 29/560/00596 * VAT registration number: DE 260543043 Management Board: Robert Gentz, David Schneider, Rubin Ritter Chairperson of the Supervisory Board: Cristina Stenbeck Registered office: Berlinn
New Consumer 0.9 API poll never returns
I am configuring the way it says in the api doc. Topic is 1 partition I set the group.id, and subscribe to the topic. If I call poll(100) the function never returns. I have to stop the unit test, because it never stops. I have waited for up to 10 minutes. Any Ideas? Thanks for your help.
SSL - kafka producer cannot publish to topic
I am trying to configure ssl communication between broker and producer. I followed the instruction on the https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka to create the key and trust store. My broker comes up without issue, I can run this command - openssl s_client -debug -connect localhost:9093 -tls1_2. It works. So broker is configured currently. I get below when try to producer tries to publish to topic. Plain test port works. C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.9.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9093 --topic topic1 adadasdasd [2015-12-10 14:05:24,842] ERROR Error when sending message to topic topic1 with key: null, value: 0 bytes with error: Failed to update metadata after 6 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) I enable enabled ssl debug on the broker I see below error. I enable ssl debug on producer but do it doesn't produce any details log. In procuder.properties tried to change metadata.broker.list=localhost:9092 to metadata.broker.list=localhost:9093, it didn't help. ( I am thinking it something silly) Using SSLEngineImpl. Allow unsafe renegotiation: false Allow legacy hello messages: true Is initial handshake: true Is secure renegotiation: false kafka-network-thread-0-SSL-3, fatal error: 80: problem unwrapping net record javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection? kafka-network-thread-0-SSL-3, SEND TLSv1.2 ALERT: fatal, description = internal_error kafka-network-thread-0-SSL-3, WRITE: TLSv1.2 Alert, length = 2 kafka-network-thread-0-SSL-3, called closeOutbound() kafka-network-thread-0-SSL-3, closeOutboundInternal() kafka-network-thread-0-SSL-3, called closeInbound() kafka-network-thread-0-SSL-3, fatal: engine already closed. Rethrowing javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack? kafka-network-thread-0-SSL-3, called closeOutbound() kafka-network-thread-0-SSL-3, closeOutboundInternal() My producer.properties metadata.broker.list=localhost:9092 producer.type=sync compression.codec=none serializer.class=kafka.serializer.DefaultEncoder # SSL settings # # keystore path assume you are starting from kafka install folder security.protocol = SSL ssl.truststore.location = client.truststore.jks ssl.truststore.password = testpass ssl.keystore.location = client.keystore.jks ssl.keystore.password = testpass ssl.key.password = testpass #ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.) #ssl.cipher.suites (Optional). "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol." ssl.enabled.protocols = TLSv1.2 #ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 **Should list at least one of the protocols configured on the broker side** ssl.truststore.type = JKS ssl.keystore.type = JKS My server.properties broker.id=0 listeners=PLAINTEXT://:9092,SSL://:9093 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # Log Basics # log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 # Log Flush Policy # # Log Retention Policy # log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=30 log.cleaner.enable=false # Zookeeper # zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 # SSL settings # # keystore path assume you are starting from kafka install folder ssl.keystore.location = server.keystore.jks ssl.keystore.password = testpass ssl.key.password = testpass ssl.truststore.location = server.truststore.jks ssl.truststore.password = testpass ssl.client.auth = none #ssl.client.auth = none "required" => client authentication is required, "requested" => client authentication is requested and client without certs can still connect when this option chosen") ssl.enabled.protocols = TLSv1.2 #ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note SSL is deprecated and using that in production is not recommended) ssl.keystore.type = JKS ssl.truststore.type = JKS #security.inter.broker.protocol = SSL no enable for now. Thanks, Shri This message and its contents (to include attachments) are the property of National Health Systems, Inc. and
Re: kafka connect(copycat) question
Roman, Agreed, this is definitely a gap in the docs (both Kafka's and Confluent's) right now. The reason it was lower priority for documentation than other items is that we expect there will be relatively few converter implementations, especially compared to the number of converters. Converters correspond to serialization formats (and any supporting pieces, like Confluent's Schema Registry for the AvroConverter), so there might be a few for, e.g., Avro, JSON, Protocol Buffers, Thrift, and possibly variants (e.g. if you have a different approach for managing Avro schemas than Confluent's schema registry). https://cwiki.apache.org/confluence/display/KAFKA/Copycat+Data+API has a slightly outdated image that explains how Converters fit into the data processing pipeline in Kafka Connect. The API is also quite simple: http://docs.confluent.io/2.0.0/connect/javadocs/org/apache/kafka/connect/storage/Converter.html -Ewen On Thu, Dec 10, 2015 at 3:34 AM, Roman Shtykhwrote: > Ewen, > > I just thought it would be helpful to have more detailed information on > converters (including what you described here) on > http://docs.confluent.io/2.0.0/connect/devguide.html > > Thanks, > Roman > > > > On Wednesday, November 11, 2015 6:59 AM, Ewen Cheslack-Postava < > e...@confluent.io> wrote: > Hi Venkatesh, > > If you're using the default settings included in the sample configs, it'll > expect JSON data in a special format to support passing schemas along with > the data. This is turned on by default because it makes it possible to work > with a *lot* more connectors and data storage systems (many require > schemas!), though it does mean consuming regular JSON data won't work out > of the box. You can easily switch this off by changing these lines in the > worker config: > > key.converter.schemas.enable=true > value.converter.schemas.enable=true > > to be false instead. However, note that this will only work with connectors > that can work with "schemaless" data. This wouldn't work for, e.g., writing > Avro files in HDFS since they need schema information, but it might work > for other formats. This would allow you to consume JSON data from any topic > it already existed in. > > Note that JSON is not the only format you can use. You can also substitute > other implementations of the Converter interface. Confluent has implemented > an Avro version that works well with our schema registry ( > https://github.com/confluentinc/schema-registry/tree/master/avro-converter > ). > The JSON implementation made sense to add as the one included with Kafka > simply because it didn't introduce any other dependencies that weren't > already in Kafka. It's also possible to write implementations for other > formats (e.g. Thrift, Protocol Buffers, Cap'n Proto, MessagePack, and > more), but I'm not aware of anyone who has started to tackle those > converters yet. > > -Ewen > > On Tue, Nov 10, 2015 at 1:23 PM, Venkatesh Rudraraju < > venkatengineer...@gmail.com> wrote: > > > Hi, > > > > I am trying out the new kakfa connect service. > > > > version : kafka_2.11-0.9.0.0 > > mode: standalone > > > > I have a conceptual question on the service. > > > > Can I just start a sink connector which reads from Kafka and writes to > say > > HDFS ? > > From what I have tried, it's expecting a source-connector as well because > > the sink-connector is expecting a particular pattern of the message in > > kafka-topic. > > > > Thanks, > > Venkat > > > > > > > -- > Thanks, > Ewen > -- Thanks, Ewen
Re: New Consumer 0.9 API poll never returns
In this case I have just published the message via a producer and connecting to the same broker list to consume. On Thu, Dec 10, 2015, 2:18 PM Jason Gustafsonwrote: > Hi Kevin, > > At the moment, the timeout parameter in poll() really only applies when the > consumer has an active partition assignment. In particular, it will block > indefinitely to get that assignment. If there are no brokers to connect to > or if you accidentally point it to an 0.8 cluster, it will probably block > forever. This is most likely what is happening in your test case. This is a > known issue, but surprisingly difficult to handle in a reasonable way due > to the fact that the consumer is single-threaded. We are working on this > problem, but for now, the best solution is to use wakeup() to break from > the poll() from an external thread. > > -Jason > > On Thu, Dec 10, 2015 at 11:50 AM, Kevin Carr wrote: > > > I am configuring the way it says in the api doc. > > > > Topic is 1 partition > > I set the group.id, and subscribe to the topic. > > > > If I call poll(100) the function never returns. I have to stop the unit > > test, because it never stops. I have waited for up to 10 minutes. > > > > Any Ideas? > > > > Thanks for your help. > > >
Re: New Consumer 0.9 API poll never returns
Sounds like a plan. I will see about getting that. I don't have to register a group id do I? On Thu, Dec 10, 2015, 2:37 PM Jason Gustafsonwrote: > And just to be clear, the broker is on 0.9? Perhaps you can enable debug > logging and send a snippet? > > -Jason > > On Thu, Dec 10, 2015 at 12:22 PM, Kevin Carr wrote: > > > In this case I have just published the message via a producer and > > connecting to the same broker list to consume. > > > > On Thu, Dec 10, 2015, 2:18 PM Jason Gustafson > wrote: > > > > > Hi Kevin, > > > > > > At the moment, the timeout parameter in poll() really only applies when > > the > > > consumer has an active partition assignment. In particular, it will > block > > > indefinitely to get that assignment. If there are no brokers to connect > > to > > > or if you accidentally point it to an 0.8 cluster, it will probably > block > > > forever. This is most likely what is happening in your test case. This > > is a > > > known issue, but surprisingly difficult to handle in a reasonable way > due > > > to the fact that the consumer is single-threaded. We are working on > this > > > problem, but for now, the best solution is to use wakeup() to break > from > > > the poll() from an external thread. > > > > > > -Jason > > > > > > On Thu, Dec 10, 2015 at 11:50 AM, Kevin Carr > wrote: > > > > > > > I am configuring the way it says in the api doc. > > > > > > > > Topic is 1 partition > > > > I set the group.id, and subscribe to the topic. > > > > > > > > If I call poll(100) the function never returns. I have to stop the > > unit > > > > test, because it never stops. I have waited for up to 10 minutes. > > > > > > > > Any Ideas? > > > > > > > > Thanks for your help. > > > > > > > > > >
Mirrormaker issue with Kafka 0.9 (confluent platform 2.0)
Hi, We upgraded to the Confluent 2.0 platform today (we were earlier using the Kafka 0.8 without the confluent platform). With the latest Kafka 0.9 that is packaged with the platform we are facing issues starting the Mirror maker with multiple consumer config files. The original command that we were using with kafka 0.8 was, bin/kafka-run-class kafka.tools.MirrorMaker --consumer.config mirrorCluster1Consumer.properties --consumer.config mirrorCluster2Consumer.properties --consumer.config mirrorCluster3Consumer.properties --consumer.config mirrorCluster4Consumer.properties --producer.config mirrorProducer.properties --whitelist "topic1,topic2" With the current Kafka 0.9 release, the tool throws the following error ERROR Exception when starting mirror maker. (kafka.tools.MirrorMaker$) joptsimple.MultipleArgumentsForOptionException: Found multiple arguments for option ['consumer.config'], but you asked for only one at joptsimple.OptionSet.valueOf(OptionSet.java:177) at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:225) at kafka.tools.MirrorMaker.main(MirrorMaker.scala) Exception in thread "main" java.lang.NullPointerException at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:276) at kafka.tools.MirrorMaker.main(MirrorMaker.scala) So I tried setting the --num.streams=4 option in the command, and it still throws the same error, next I tried to use the --new.consumer option but the tool fails again with the same error. Not sure how to get the Mirrormaker tool running with multiple consumer config files. Could not find much documentation regarding this except for the one in the kafka documentation ( http://kafka.apache.org/documentation.html#basic_ops_mirror_maker). ps : I was unable to post this in the confluent mailing list and hence posting here. Thanks, Meghana
Re: New Consumer 0.9 API poll never returns
You just have to set "group.id" in the configuration. Please also include the version of the broker. -Jason On Thu, Dec 10, 2015 at 12:38 PM, Kevin Carrwrote: > Sounds like a plan. I will see about getting that. > > I don't have to register a group id do I? > > On Thu, Dec 10, 2015, 2:37 PM Jason Gustafson wrote: > > > And just to be clear, the broker is on 0.9? Perhaps you can enable debug > > logging and send a snippet? > > > > -Jason > > > > On Thu, Dec 10, 2015 at 12:22 PM, Kevin Carr wrote: > > > > > In this case I have just published the message via a producer and > > > connecting to the same broker list to consume. > > > > > > On Thu, Dec 10, 2015, 2:18 PM Jason Gustafson > > wrote: > > > > > > > Hi Kevin, > > > > > > > > At the moment, the timeout parameter in poll() really only applies > when > > > the > > > > consumer has an active partition assignment. In particular, it will > > block > > > > indefinitely to get that assignment. If there are no brokers to > connect > > > to > > > > or if you accidentally point it to an 0.8 cluster, it will probably > > block > > > > forever. This is most likely what is happening in your test case. > This > > > is a > > > > known issue, but surprisingly difficult to handle in a reasonable way > > due > > > > to the fact that the consumer is single-threaded. We are working on > > this > > > > problem, but for now, the best solution is to use wakeup() to break > > from > > > > the poll() from an external thread. > > > > > > > > -Jason > > > > > > > > On Thu, Dec 10, 2015 at 11:50 AM, Kevin Carr > > wrote: > > > > > > > > > I am configuring the way it says in the api doc. > > > > > > > > > > Topic is 1 partition > > > > > I set the group.id, and subscribe to the topic. > > > > > > > > > > If I call poll(100) the function never returns. I have to stop the > > > unit > > > > > test, because it never stops. I have waited for up to 10 minutes. > > > > > > > > > > Any Ideas? > > > > > > > > > > Thanks for your help. > > > > > > > > > > > > > > >
Re: New Consumer 0.9 API poll never returns
And just to be clear, the broker is on 0.9? Perhaps you can enable debug logging and send a snippet? -Jason On Thu, Dec 10, 2015 at 12:22 PM, Kevin Carrwrote: > In this case I have just published the message via a producer and > connecting to the same broker list to consume. > > On Thu, Dec 10, 2015, 2:18 PM Jason Gustafson wrote: > > > Hi Kevin, > > > > At the moment, the timeout parameter in poll() really only applies when > the > > consumer has an active partition assignment. In particular, it will block > > indefinitely to get that assignment. If there are no brokers to connect > to > > or if you accidentally point it to an 0.8 cluster, it will probably block > > forever. This is most likely what is happening in your test case. This > is a > > known issue, but surprisingly difficult to handle in a reasonable way due > > to the fact that the consumer is single-threaded. We are working on this > > problem, but for now, the best solution is to use wakeup() to break from > > the poll() from an external thread. > > > > -Jason > > > > On Thu, Dec 10, 2015 at 11:50 AM, Kevin Carr wrote: > > > > > I am configuring the way it says in the api doc. > > > > > > Topic is 1 partition > > > I set the group.id, and subscribe to the topic. > > > > > > If I call poll(100) the function never returns. I have to stop the > unit > > > test, because it never stops. I have waited for up to 10 minutes. > > > > > > Any Ideas? > > > > > > Thanks for your help. > > > > > >
RE: SSL - kafka producer cannot publish to topic
Figured it out. I was adding the ssl properties to producer.properties. We need to add this to separate file and provide that file as input to procuder bat\sh script --producer.config client-ssl.properties. It seems the kafka.tools.ConsoleProducer class needs to have --producer.config parameter pointing to just ssl configuration. It does not pick it up from producer.properties. -Original Message- From: Shrikant Patel [mailto:spa...@pdxinc.com] Sent: Thursday, December 10, 2015 2:09 PM To: users@kafka.apache.org Subject: SSL - kafka producer cannot publish to topic I am trying to configure ssl communication between broker and producer. I followed the instruction on the https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka to create the key and trust store. My broker comes up without issue, I can run this command - openssl s_client -debug -connect localhost:9093 -tls1_2. It works. So broker is configured currently. I get below when try to producer tries to publish to topic. Plain test port works. C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.9.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9093 --topic topic1 adadasdasd [2015-12-10 14:05:24,842] ERROR Error when sending message to topic topic1 with key: null, value: 0 bytes with error: Failed to update metadata after 6 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) I enable enabled ssl debug on the broker I see below error. I enable ssl debug on producer but do it doesn't produce any details log. In procuder.properties tried to change metadata.broker.list=localhost:9092 to metadata.broker.list=localhost:9093, it didn't help. ( I am thinking it something silly) Using SSLEngineImpl. Allow unsafe renegotiation: false Allow legacy hello messages: true Is initial handshake: true Is secure renegotiation: false kafka-network-thread-0-SSL-3, fatal error: 80: problem unwrapping net record javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection? kafka-network-thread-0-SSL-3, SEND TLSv1.2 ALERT: fatal, description = internal_error kafka-network-thread-0-SSL-3, WRITE: TLSv1.2 Alert, length = 2 kafka-network-thread-0-SSL-3, called closeOutbound() kafka-network-thread-0-SSL-3, closeOutboundInternal() kafka-network-thread-0-SSL-3, called closeInbound() kafka-network-thread-0-SSL-3, fatal: engine already closed. Rethrowing javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack? kafka-network-thread-0-SSL-3, called closeOutbound() kafka-network-thread-0-SSL-3, closeOutboundInternal() My producer.properties metadata.broker.list=localhost:9092 producer.type=sync compression.codec=none serializer.class=kafka.serializer.DefaultEncoder # SSL settings # # keystore path assume you are starting from kafka install folder security.protocol = SSL ssl.truststore.location = client.truststore.jks ssl.truststore.password = testpass ssl.keystore.location = client.keystore.jks ssl.keystore.password = testpass ssl.key.password = testpass #ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.) #ssl.cipher.suites (Optional). "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol." ssl.enabled.protocols = TLSv1.2 #ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 **Should list at least one of the protocols configured on the broker side** ssl.truststore.type = JKS ssl.keystore.type = JKS My server.properties broker.id=0 listeners=PLAINTEXT://:9092,SSL://:9093 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # Log Basics # log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 # Log Flush Policy # # Log Retention Policy # log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=30 log.cleaner.enable=false # Zookeeper # zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 # SSL settings # # keystore path assume you are starting from kafka install folder ssl.keystore.location = server.keystore.jks ssl.keystore.password = testpass ssl.key.password = testpass ssl.truststore.location = server.truststore.jks ssl.truststore.password = testpass ssl.client.auth = none #ssl.client.auth = none "required" => client authentication is required,
Re: Mirrormaker issue with Kafka 0.9 (confluent platform 2.0)
Meghana, It looks like this functionality was removed in https://issues.apache.org/jira/browse/KAFKA-1650, although I don't see explicit discussion of the issue in the JIRA so I'm not sure of the exact motivation. Maybe Becket or Guozhang can offer some insight about if it is necessary (that JIRA fixes a data loss issue) or an accidental incompatibility. -Ewen On Thu, Dec 10, 2015 at 2:32 PM, Meghana Narasimhan < mnarasim...@bandwidth.com> wrote: > Hi, > > We upgraded to the Confluent 2.0 platform today (we were earlier using the > Kafka 0.8 without the confluent platform). With the latest Kafka 0.9 that > is packaged with the platform we are facing issues starting the Mirror > maker with multiple consumer config files. > > The original command that we were using with kafka 0.8 was, > > bin/kafka-run-class kafka.tools.MirrorMaker --consumer.config > mirrorCluster1Consumer.properties --consumer.config > mirrorCluster2Consumer.properties --consumer.config > mirrorCluster3Consumer.properties --consumer.config > mirrorCluster4Consumer.properties --producer.config > mirrorProducer.properties --whitelist "topic1,topic2" > > With the current Kafka 0.9 release, the tool throws the following error > > ERROR Exception when starting mirror maker. (kafka.tools.MirrorMaker$) > joptsimple.MultipleArgumentsForOptionException: Found multiple arguments > for option ['consumer.config'], but you asked for only one > at joptsimple.OptionSet.valueOf(OptionSet.java:177) > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:225) > at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > Exception in thread "main" java.lang.NullPointerException > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:276) > at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > > So I tried setting the --num.streams=4 option in the command, and it still > throws the same error, next I tried to use the --new.consumer option but > the tool fails again with the same error. > > Not sure how to get the Mirrormaker tool running with multiple consumer > config files. > > Could not find much documentation regarding this except for the one in the > kafka documentation ( > http://kafka.apache.org/documentation.html#basic_ops_mirror_maker). > > ps : I was unable to post this in the confluent mailing list and hence > posting here. > > Thanks, > Meghana > -- Thanks, Ewen
commitSync with new Subscriber API
I am using the new Subscriber API and I poll. The poll returns 2005 messages. I process 2 of them, and then set the offset to the 2nd message offset that I processed. I then call commitSync with the TopicPartition and this new offset. When I list the group with the command line tool, it always shows the original offset. Am I doing something wrong?
Re: commitSync with new Subscriber API
Can you paste your list group command line here? Guozhang On Thu, Dec 10, 2015 at 2:45 PM, Kevin Carrwrote: > I am using the new Subscriber API and I poll. The poll returns 2005 > messages. I process 2 of them, and then set the offset to the 2nd message > offset that I processed. > > I then call commitSync with the TopicPartition and this new offset. When I > list the group with the command line tool, it always shows the original > offset. > > Am I doing something wrong? > -- -- Guozhang
Re: Mirrormaker issue with Kafka 0.9 (confluent platform 2.0)
Meghana, As Ewen said in KAFKA-1650 we removed the support of multiple consumer configs in MM, so that you cannot get one MM instance to pull from multiple origin clusters, but instead you need to use one MM instance for each origin. The motivation for dropping this support is mainly operational: with the previous settings if one of the original cluster falls into a bad state, we cannot stop / debug that MM since it also pulls from other clusters. And other minor issues include metics reporting, etc. For example at LI people use one MM instance for each origin cluster, that are co-located with the destination cluster, so if you need to mirror from cluster A / B / C to D, you will have a A->D / B->D / C->D MM. Moving forward we are also trying to add exact mirroring features into MM, which would better be a 1-to-1 mapping. Guozhang On Thu, Dec 10, 2015 at 3:21 PM, Ewen Cheslack-Postavawrote: > Meghana, > > It looks like this functionality was removed in > https://issues.apache.org/jira/browse/KAFKA-1650, although I don't see > explicit discussion of the issue in the JIRA so I'm not sure of the exact > motivation. Maybe Becket or Guozhang can offer some insight about if it is > necessary (that JIRA fixes a data loss issue) or an accidental > incompatibility. > > -Ewen > > On Thu, Dec 10, 2015 at 2:32 PM, Meghana Narasimhan < > mnarasim...@bandwidth.com> wrote: > > > Hi, > > > > We upgraded to the Confluent 2.0 platform today (we were earlier using > the > > Kafka 0.8 without the confluent platform). With the latest Kafka 0.9 that > > is packaged with the platform we are facing issues starting the Mirror > > maker with multiple consumer config files. > > > > The original command that we were using with kafka 0.8 was, > > > > bin/kafka-run-class kafka.tools.MirrorMaker --consumer.config > > mirrorCluster1Consumer.properties --consumer.config > > mirrorCluster2Consumer.properties --consumer.config > > mirrorCluster3Consumer.properties --consumer.config > > mirrorCluster4Consumer.properties --producer.config > > mirrorProducer.properties --whitelist "topic1,topic2" > > > > With the current Kafka 0.9 release, the tool throws the following error > > > > ERROR Exception when starting mirror maker. (kafka.tools.MirrorMaker$) > > joptsimple.MultipleArgumentsForOptionException: Found multiple arguments > > for option ['consumer.config'], but you asked for only one > > at joptsimple.OptionSet.valueOf(OptionSet.java:177) > > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:225) > > at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > > Exception in thread "main" java.lang.NullPointerException > > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:276) > > at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > > > > So I tried setting the --num.streams=4 option in the command, and it > still > > throws the same error, next I tried to use the --new.consumer option but > > the tool fails again with the same error. > > > > Not sure how to get the Mirrormaker tool running with multiple consumer > > config files. > > > > Could not find much documentation regarding this except for the one in > the > > kafka documentation ( > > http://kafka.apache.org/documentation.html#basic_ops_mirror_maker). > > > > ps : I was unable to post this in the confluent mailing list and hence > > posting here. > > > > Thanks, > > Meghana > > > > > > -- > Thanks, > Ewen > -- -- Guozhang
Re: Match Producer and RecordMetadata with Consumer and ConsumerRecord
Thanks Ewen! This will help us. From: Ewen Cheslack-PostavaSent: Thursday, December 10, 2015 1:50 AM To: users@kafka.apache.org Cc: Michael Pasacrita; Satish Sankaranarayanan Subject: Re: Match Producer and RecordMetadata with Consumer and ConsumerRecord John, Your question was a bit confusing because CorrelationID has a specific meaning in the Kafka protocols, but those are an implementation detail that you, as a user of the API, should not need to worry about. CorrelationIDs as defined by the protocol are not exposed to the user (and do not correspond to individual Kafka messages -- they correspond to request/response pairs in the protocol). In Kafka you can uniquely identify a message by the (topic, partition, offset) tuple. To match up the record passed to a producer and the record received by a consumer, you would use, as you suggested in your first email: Producer: RecordMetadata.topic(), RecordMetadata.partition(), RecordMetadata.offset() Consumer: ConsumerRecord.topic(), ConsumerRecord.partition(), ConsumerRecord.offset() -Ewen On Wed, Dec 9, 2015 at 11:29 AM, John Menke wrote: > Gwen, thanks for the reply: > > Just to confirm - are we talking about the combination of the IDs that I > layed out as being the "CorrelationID" ? > > I guess the alternative is to pass our own CorrelationID as part of the > data. Would you agree with this? > > I do see something at this address that talks about a "user defined" > CorrelationID > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceResponse > > CorrelationId: > > This is a user-supplied integer. It will be passed back in the response by > the server, unmodified. It is useful for matching request and response > between the client and server. > > > > -Original Message- > From: Gwen Shapira [mailto:g...@confluent.io] > Sent: Wednesday, December 09, 2015 12:50 PM > To: users@kafka.apache.org > Subject: Re: Match Producer and RecordMetadata with Consumer and > ConsumerRecord > > Correlation ID is for a request (i.e. separate ID for produce request and > a fetch request), not a record. So it can't be used in the way you are > trying to. > > On Wed, Dec 9, 2015 at 9:30 AM, John Menke wrote: > > > Can a correlationID be created from a ConsumerRecord that will allow > > for identification of the corresponding RecordMetaData instance that > > was returned from the Producer.send() method? > > > > I am Looking at the JavaDocs and the Producer returns RecordMetadata > > which has the following signature: > > > > RecordMetadata(TopicPartition topicPartition, long baseOffset, long > > relativeOffset) > > > > I am not sure if this can match to > > > > ConsumerRecord(java.lang.String topic, int partition, long offset, K > > key, V value) > > > > Can we match producer requests to Consumer Records by matching these > > values? > > > > RecordMetaData.TopicPartion.topic = ConsumerRecord.topic > > RecordMetaData.TopicPartion.partition = ConsumerRecord.partition > > RecordMetaData(baseOffset + relativeOffset) = ConsumerRecord.offset > > > > In other words, can a CorrelationID be created from these values that > > will allow Consumers to link back to the Producer send() event. (In > > the client calling code) > > > > > > > > > > > > > > > -- Thanks, Ewen
Creating partition with many partitions
Hi, We are considering an architecture with a Kafka cluster of 3 nodes and a high number of consumers. We see that with a low number of partitions, e.g. 3, and a higher number of consumers, e.g. 16, there will be only 3 consumers actually consuming data, because only the owners of partitions can consume messages. To see the owners we do the following: $ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group consumer_group Group Topic Pid Offset logSize Lag Owner consumer_group statistics 0 5335 537338 consumer_group_balthasar-1449651803301-63a1d620-0 consumer_group statistics 1 5335 537439 consumer_group_balthasar-1449651803820-35a84426-0 consumer_group statistics 2 5335 537439 consumer_group_balthasar-1449651803934-2b3cc1bd-0 One solution to being able to have many consumers is to increase the amount of partitions to a high number, e.g. 1024. This would put more load on the machines running Kafka, but would this load be crazy? The machines that'll be running Kafka have 64GB RAM and a Xeon E5-2620 CPU (6 cores clocked at 2GHz, 24 hardware threads in total). Are there any other reasons not to use such a high number of partitions? Kind regards, Balthasar Schopman Kind regards, Balthasar Schopman Software Developer LeaseWeb Technologies B.V. T: +31 20 316 0232 M: E: b.schop...@tech.leaseweb.com W: http://www.leaseweb.com Luttenbergweg 8, 1101 EC Amsterdam, Netherlands