[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15662738#comment-15662738 ] Ewen Cheslack-Postava commented on KAFKA-3959: -- [~granthenke] [~onurkaraman] [~toddpalino] Any more thoughts on this? I think the main use case for handling < 3 brokers by default is when we start up a "cluster" locally for test purposes. Any real use case that wanted a lower replication factor could set it explicitly. This is pretty important and we don't really want to have users jump through hoops to do so; that said, a dramatic warning wouldn't be the end of the world. Maybe even some combination of a low setting plus a setting that gives unsafe warnings but allows unsafely low replication factors for this topic? > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4404) Add knowledge of sign to numeric schema types
Andy Bryant created KAFKA-4404: -- Summary: Add knowledge of sign to numeric schema types Key: KAFKA-4404 URL: https://issues.apache.org/jira/browse/KAFKA-4404 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 0.10.0.1 Reporter: Andy Bryant Assignee: Ewen Cheslack-Postava Priority: Minor For KafkaConnect schemas there is currently no concept of whether a numeric field is signed or unsigned. Add an additional `signed` attribute (like optional) or make it explicit that numeric types must be signed. You could encode this as a parameter on the schema but this would not be standard across all connectors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-3967) Excessive Network IO between Kafka brokers
[ https://issues.apache.org/jira/browse/KAFKA-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-3967. -- Resolution: Invalid [~Krishna82] Closing this for now since we haven't heard back. If there are some details that were missing in the initial report that show this is actually an unexpectedly high throughput for replication, please reopen and add some more details. > Excessive Network IO between Kafka brokers > --- > > Key: KAFKA-3967 > URL: https://issues.apache.org/jira/browse/KAFKA-3967 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.2 >Reporter: Krishna > > Excessive Network IO between Kafka brokers running on AWS in different AZ's > as compared to actual message volume. > We are producing 2-5 MB /Sec message volume however kafka seems to me moving > 20 gb /hr on network. The data volume has around 12 GB of message log on each > nodes. Is this a natural behavior ?. I believe only the new messages will get > replicated on non-leader nodes however here it seems that entire log is > re-synced -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4021) system tests need to enable trace level logging for controller and state-change log
[ https://issues.apache.org/jira/browse/KAFKA-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15662381#comment-15662381 ] Ewen Cheslack-Postava commented on KAFKA-4021: -- [~junrao] Is this actually reasonable to do across the board for the Kafka service? Normally trace-level logs incur enough overhead that they can affect normal behavior and, e.g., could drastically affect anything trying to get performance stats. > system tests need to enable trace level logging for controller and > state-change log > --- > > Key: KAFKA-4021 > URL: https://issues.apache.org/jira/browse/KAFKA-4021 > Project: Kafka > Issue Type: Improvement > Components: system tests >Affects Versions: 0.10.0.0 >Reporter: Jun Rao >Assignee: Geoff Anderson > > We store detailed information about leader changes at trace level in the > controller and the state-change log. Currently, our system tests only collect > debug level logs. It would be useful to collect trace level logging for these > two logs and archive them if there are test failures, at least for > replication related tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Upgrading from kafka-0.8.1.1 to kafka-0.9.0.1
The errors you're seeing sound like an issue where you updated the artifact but didn't recompile against the newer Scala version. Did you recompile or just replace the Kafka jar with a newer one? -Ewen On Wed, Nov 9, 2016 at 4:31 PM, Divyajothi Baskaranwrote: > Hi, > For the past 6 months,I am the dev for our solution written on top of > kafka-0.8.1.1. It is in stable for us. We thought we would upgrade to > kafka-0.9.0.1. > With the server upgrade, we did not face any issues. > > We have our own solution built to extract the messages and write to > different destinations and also messages read by storm. For our unit tests > we were using the following maven artifact > > org.apache.kafka > > kafka_2.9.2 > > 0.8.1.1 > > > I could not find, 0.9.0.1 version for kafka_2.9.2. Hence I moved to > kafka_2.11 first. This is the artifact used: > > org.apache.kafka > > kafka_2.11 > > 0.9.0.1 > > > I was running into following issue: > > * scala.ScalaObject not found issue > * java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc( > Ljava/lang/Object;)Ljava/lang/Object; > * kafkaConfig issue with NoSuchMethodError > (Ljava/util/map;)Ljava/util/map > > Also most of the time, I would run into KafkaServerStartable(both in > kafka_2.10-0.9.0.1 and kafka_2.11-0.9.0.1) hang issue. But with the same > unit tests, I never got into kafka server hang issue with kafka_2.9.2. > > > Could you please help me with my problem ? > > Am I missing anything? > > > Thanks, > > Divya > > -- Thanks, Ewen
[jira] [Commented] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fash
[ https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15662197#comment-15662197 ] Ewen Cheslack-Postava commented on KAFKA-4402: -- [~Jun Yao] Sorry, missed that update with the example code while I was commenting on the PR. I understand how you can encounter the imbalance when using multiple topics, see PR for questions around the solution and whether we want to change the default vs provide an alternative when this is a problem. > Kafka Producer's DefaultPartitioner is actually not round robin as said in > the code comments "If no partition or key is present choose a partition in a > round-robin fashion" > > > Key: KAFKA-4402 > URL: https://issues.apache.org/jira/browse/KAFKA-4402 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Yao >Priority: Minor > > From this code comments, it is said that Kafka client Producer's > DefaultPartitioner will do round robin if "no partition or key is present", > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34 > from the code it looks trying to do round robin as well, as it maintained a > counter and try to increase it every time and then will decide which > partition to go to; > However the issue here is the counter is a global counter that is shared by > all the topics, so it is actually not round robin per topic and sometimes > caused unbalanced routing among different partitions. > Although we can pass a custom implementation of interface > "org.apache.kafka.clients.producer.Partitioner", it might be still good to > make the default implementation true round robin as comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fash
[ https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15662198#comment-15662198 ] Ewen Cheslack-Postava commented on KAFKA-4402: -- [~Jun Yao] Sorry, missed that update with the example code while I was commenting on the PR. I understand how you can encounter the imbalance when using multiple topics, see PR for questions around the solution and whether we want to change the default vs provide an alternative when this is a problem. > Kafka Producer's DefaultPartitioner is actually not round robin as said in > the code comments "If no partition or key is present choose a partition in a > round-robin fashion" > > > Key: KAFKA-4402 > URL: https://issues.apache.org/jira/browse/KAFKA-4402 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Yao >Priority: Minor > > From this code comments, it is said that Kafka client Producer's > DefaultPartitioner will do round robin if "no partition or key is present", > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34 > from the code it looks trying to do round robin as well, as it maintained a > counter and try to increase it every time and then will decide which > partition to go to; > However the issue here is the counter is a global counter that is shared by > all the topics, so it is actually not round robin per topic and sometimes > caused unbalanced routing among different partitions. > Although we can pass a custom implementation of interface > "org.apache.kafka.clients.producer.Partitioner", it might be still good to > make the default implementation true round robin as comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fash
[ https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15662168#comment-15662168 ] Jun Yao commented on KAFKA-4402: Hi, Ewen, I updated the description, I am looking at the same producer code as you are. The issue is more that the counter is not per topic. I added a unit test in my pr to validate this, without the fix the result will not be balanced. https://github.com/apache/kafka/pull/2128/files#diff-f30df3b3b79e9be0de6c94dcce90a56e meanwhile, I also run a local test from producer side to validate this: bin/kafka-topics.sh --create --topic mtest0 --zookeeper localhost:2181 --partitions 3 --replication-factor 1 bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1 public class KafkaProducerPartitionTest { private Producerproducer; public static void main(String[] args) { KafkaProducerPartitionTest kafkaProducerPartitionTest = new KafkaProducerPartitionTest(); try { kafkaProducerPartitionTest.run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws InterruptedException { initProducer(); Map partitionCount = new HashMap<>(); String loopTopic = "mtest0"; CountDownLatch latch = new CountDownLatch(360); Callback c = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(loopTopic.equalsIgnoreCase(metadata.topic())){ partitionCount.put(metadata.partition(), partitionCount.getOrDefault(metadata.partition(), 0) + 1); } latch.countDown(); } }; for(int i = 0; i < 300; ++i){ producer.send(new ProducerRecord (loopTopic, "" + i), c); if(i%5 == 0 ){ producer.send(new ProducerRecord ("test", "a"), c); } } latch.await(); System.out.println("partitionCount=" + partitionCount); } public void initProducer() { try { Properties props = new Properties(); props.load(getClass().getClassLoader().getResourceAsStream("kafka-config.properties")); producer = new KafkaProducer<>(props); } catch (IOException e) { e.printStackTrace(); } } Without the fix, it will print partitionCount={0=60, 1=120, 2=120} after the fix, it will print partitionCount={0=100, 1=100, 2=100} > Kafka Producer's DefaultPartitioner is actually not round robin as said in > the code comments "If no partition or key is present choose a partition in a > round-robin fashion" > > > Key: KAFKA-4402 > URL: https://issues.apache.org/jira/browse/KAFKA-4402 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Yao >Priority: Minor > > From this code comments, it is said that Kafka client Producer's > DefaultPartitioner will do round robin if "no partition or key is present", > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34 > from the code it looks trying to do round robin as well, as it maintained a > counter and try to increase it every time and then will decide which > partition to go to; > However the issue here is the counter is a global counter that is shared by > all the topics, so it is actually not round robin per topic and sometimes > caused unbalanced routing among different partitions. > Although we can pass a custom implementation of interface > "org.apache.kafka.clients.producer.Partitioner", it might be still good to > make the default implementation true round robin as comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4403) Update KafkaBasedLog to use new endOffsets consumer API
Ewen Cheslack-Postava created KAFKA-4403: Summary: Update KafkaBasedLog to use new endOffsets consumer API Key: KAFKA-4403 URL: https://issues.apache.org/jira/browse/KAFKA-4403 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.1.0 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor As of 0.10.1.0 and KIP-79 (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090) KafkaConsumer can now fetch offset information about topic partitions. Previously KafkaBasedLog had to use a seekToEnd + position approach to determine end offsets. With the new APIs we can simplify this code. This isn't critical as the current code works fine, but would be a nice cleanup and simplification. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fash
[ https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15662157#comment-15662157 ] ASF GitHub Bot commented on KAFKA-4402: --- GitHub user yaojuncn opened a pull request: https://github.com/apache/kafka/pull/2128 KAFKA-4402: make the KafkaProducer true round robin per topic You can merge this pull request into a Git repository by running: $ git pull https://github.com/yaojuncn/kafka KAFKA-4402-client-producer-round-robin-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2128.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2128 commit d74b0bd4d712223da38b3f2e666e4611bf65b455 Author: yaojuncnDate: 2016-11-13T21:46:44Z KAFKA-4402: make the KafkaProducer true round robin per topic > Kafka Producer's DefaultPartitioner is actually not round robin as said in > the code comments "If no partition or key is present choose a partition in a > round-robin fashion" > > > Key: KAFKA-4402 > URL: https://issues.apache.org/jira/browse/KAFKA-4402 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Yao >Priority: Minor > > From this code comments, it is said that Kafka client Producer's > DefaultPartitioner will do round robin if "no partition or key is present", > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34 > from the code it looks trying to do round robin as well, as it maintained a > counter and try to increase it every time and then will decide which > partition to go to; > However the issue here is the counter is a global counter that is shared by > all the topics, so it is actually not round robin per topic and sometimes > caused unbalanced routing among different partitions. > Although we can pass a custom implementation of interface > "org.apache.kafka.clients.producer.Partitioner", it might be still good to > make the default implementation true round robin as comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request #2128: KAFKA-4402: make the KafkaProducer true round robi...
GitHub user yaojuncn opened a pull request: https://github.com/apache/kafka/pull/2128 KAFKA-4402: make the KafkaProducer true round robin per topic You can merge this pull request into a Git repository by running: $ git pull https://github.com/yaojuncn/kafka KAFKA-4402-client-producer-round-robin-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2128.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2128 commit d74b0bd4d712223da38b3f2e666e4611bf65b455 Author: yaojuncnDate: 2016-11-13T21:46:44Z KAFKA-4402: make the KafkaProducer true round robin per topic --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fashio
[ https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Yao updated KAFKA-4402: --- Description: >From this code comments, it is said that Kafka client Producer's >DefaultPartitioner will do round robin if "no partition or key is present", https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34 from the code it looks trying to do round robin as well, as it maintained a counter and try to increase it every time and then will decide which partition to go to; However the issue here is the counter is a global counter that is shared by all the topics, so it is actually not round robin per topic and sometimes caused unbalanced routing among different partitions. Although we can pass a custom implementation of interface "org.apache.kafka.clients.producer.Partitioner", it might be still good to make the default implementation true round robin as comment. > Kafka Producer's DefaultPartitioner is actually not round robin as said in > the code comments "If no partition or key is present choose a partition in a > round-robin fashion" > > > Key: KAFKA-4402 > URL: https://issues.apache.org/jira/browse/KAFKA-4402 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Yao >Priority: Minor > > From this code comments, it is said that Kafka client Producer's > DefaultPartitioner will do round robin if "no partition or key is present", > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34 > from the code it looks trying to do round robin as well, as it maintained a > counter and try to increase it every time and then will decide which > partition to go to; > However the issue here is the counter is a global counter that is shared by > all the topics, so it is actually not round robin per topic and sometimes > caused unbalanced routing among different partitions. > Although we can pass a custom implementation of interface > "org.apache.kafka.clients.producer.Partitioner", it might be still good to > make the default implementation true round robin as comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fash
[ https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15662153#comment-15662153 ] Ewen Cheslack-Postava commented on KAFKA-4402: -- [~Jun Yao] Which code are you looking at for this? https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L55-L64 seems to have the right behavior since it uses a counter and modulo to select a partition when the bytes are null. It isn't perfect when the number of partitions change since it might have outdated metadata, but that is an edge case. Perhaps you were looking at the old DefaultPartitioner.scala for the old producer? > Kafka Producer's DefaultPartitioner is actually not round robin as said in > the code comments "If no partition or key is present choose a partition in a > round-robin fashion" > > > Key: KAFKA-4402 > URL: https://issues.apache.org/jira/browse/KAFKA-4402 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Yao >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-89: Allow sink connectors to decouple flush and offset commit
+1 (binding) On Nov 9, 2016 2:17 PM, "Shikhar Bhushan"wrote: > Hi, > > I would like to initiate a vote on KIP-89 > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 89%3A+Allow+sink+connectors+to+decouple+flush+and+offset+commit > > Best, > > Shikhar >
[jira] [Commented] (KAFKA-4401) Change the KafkaServerTestHarness and IntegrationTestHarness from trait to abstract class.
[ https://issues.apache.org/jira/browse/KAFKA-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15662147#comment-15662147 ] Ewen Cheslack-Postava commented on KAFKA-4401: -- [~becket_qin] We've already done much of this work for some of our Java-based projects, e.g. see https://github.com/confluentinc/schema-registry/blob/master/core/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java and with variants for security, e.g. https://github.com/confluentinc/schema-registry/blob/master/core/src/test/java/io/confluent/kafka/schemaregistry/SSLClusterTestHarness.java. These have actually caused a bit of pain because they rely on internals so can break unexpectedly due to changes in Kafka. Given that, it would be handy if they were just part of Kafka itself. We could probably lift most of these implementations directly (they include schema registry startup as well, but that should be trivial to strip out.) That said, we've actually moved away from including integration tests like this in most of our projects in favor of putting tests like these into system tests. They remain in our schema registry and REST proxy mainly for historical reasons, i.e. the cost of refactoring them hasn't become worth it in these cases since the tests can still run relatively quickly (compared to Kafka's tests which now have so many integration tests that they dominate the 15-20 minute test runtime on a developer laptop). I'm a bit torn as to whether this would be a good addition; on the one hand people are doing this so standardizing it and avoiding 83 different implementations seems good, on the other hand I think it leads to people dumping too many tests that are actually system tests into tests that they call integration tests and run via unit tests... > Change the KafkaServerTestHarness and IntegrationTestHarness from trait to > abstract class. > -- > > Key: KAFKA-4401 > URL: https://issues.apache.org/jira/browse/KAFKA-4401 > Project: Kafka > Issue Type: Task > Components: unit tests >Affects Versions: 0.10.1.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin > Fix For: 0.10.1.1 > > > The IntegartionTestHarness and KafkaServerTestHarness are useful not only in > Kafka unit test, but also useful for the unit tests in other products that > depend on Kafka. > Currently there are two issues making those two test harness classes hard to > use by other Java users. > 1. The two classes are Scala traits. This makes it difficult for people to > write Java unit test code. > 2. Some of the interfaces are Scala only. > It will be good to expose those two classes for more general usage and make > them Java friendly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fashio
Jun Yao created KAFKA-4402: -- Summary: Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fashion" Key: KAFKA-4402 URL: https://issues.apache.org/jira/browse/KAFKA-4402 Project: Kafka Issue Type: Improvement Reporter: Jun Yao Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re?? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Becket, If using the trim.on.offset.commit parameter, it will help to quickly trim the log, but other consumer group's consumer may find the messages are trimmed. We still need to coordinate many consumer groups to trim the log, it seems difficult for the single consumer to do it. Then it will still come to the problem: whether to implement in the broker side or in the admin client side. Even implement in the broker side, we can still using the trim API to finish the log deletion for Leader or Replica segments. And we can offer an option to safely delete the log(disable by default), so this is motivation for this KIP. Thanks, David -- -- ??: "Becket Qin";; : 2016??11??6??(??) 11:39 ??: "dev" ; : Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hi David, I am thinking that depending on the use case, we may not need a separate tool to have the committed message based retention using the trim() method. One way to do this is to have a configuration like trim.on.offset.commit in the consumer so after committing the offset, the consumer will also send a trim request to the broker. In some cases, the application may want to trim the log in a more flexible way, e.g not trim on commit but every hour. In that case, it is true that users will need to trim the log with a separate admin client. However that logic could be a long running stand-alone service independent of Kafka or the application. It may have its own configurations as we discussed in this KIP so the applications in that case would just talk to that service to trim the log instead of taking to Kafka. Thanks, Jiangjie (Becket) Qin On Sun, Nov 6, 2016 at 6:10 AM, <254479...@qq.com> wrote: > Hi Becket, > The most important benefit of method (2) is we can safely delete the > log segments, becasue all the deleted log segments are consumed. > If the messages are very important, in this case we need to safely delete > the log segments instead of forcing delete it after the retention time. > Kafka itself can insure all the deleted logs are consumed to improve > End-to-End reliability. And this feature by default is disabled, so will > stay simple for people not use it. > Actually users can build a tool using the trimRequest to do this > work(method 1), but users must start this tool with kafka all the time, > this may not always holds. > > > Thanks, > David > > > > > > > > > -- -- > ??: "Becket Qin"; ; > : 2016??11??1??(??) 3:57 > ??: "dev" ; > > : Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > I think the trim() API is generally useful for the consume based retention > as well as other use cases. So we probably should have (1). > > For (2), it is more of an optimization by doing a favor for the users. This > could be implemented on top of (1) if we want to. So maybe we can implement > (1) first and let the applications do the trim() by themselves at this > point. This will put more burden on the application side but is not that > bad if there is only one downstream consumer group. In the future if we > find more use cases where multiple down stream consumer groups need to > coordinate among themselves and a broker side help would make things > simpler, we can add (2) then. > > Regarding the relation between this KIP and KIP-47. At a high level, they > are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would > be worth thinking about them together. After KIP-79, we can search messages > by timestamp, this essentially translates the timestamp to offsets. So > KIP-47 can also be built on top of the trim() by offsets interface after > translating the timestamp to offsets. Jun has suggested an implementation > in KIP-47 discussion thread which introduces a new TrimRequest. Would you > take a look and see if that could be used for KIP-68 as well? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Sun, Oct 30, 2016 at 2:24 AM, <254479...@qq.com> wrote: > > > Hi All, > > > > > > As per our discussion, there are two ways to clean the consumed log: > > > > > > 1) Use an Admin Tool to find the min commit offset for some topics of the > > specified set of consumer groups, then send the trim API to all the > > replicas of the brokers, > > then the brokers will start to trim the log segments of these topics. > > > > > > The benefit of this method is to keep the broker simple and more flexible > > for the users, but it is more complicated for the users to clean all the > > messages which are consumed. > > > > > > 2) Broker will periodically do the consumed log retention as the KIP > > mentioned. This method is simple for the users and it can automatically > > clean the consumed log, but it will add
[jira] [Resolved] (KAFKA-4398) offsetsForTimes returns false starting offset when timestamp of messages are not monotonically increasing
[ https://issues.apache.org/jira/browse/KAFKA-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxi resolved KAFKA-4398. - Resolution: Not A Bug > offsetsForTimes returns false starting offset when timestamp of messages are > not monotonically increasing > - > > Key: KAFKA-4398 > URL: https://issues.apache.org/jira/browse/KAFKA-4398 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Affects Versions: 0.10.1.0 >Reporter: huxi >Assignee: huxi > > After a code walk-through for KIP-33(Add a time based log index), I found a > use case where method 'offsetsForTimes' fails to return the correct offset if > a series of messages are created without the monotonically increasing > timestamps (CreateTime is used) > Say T0 is the hour when the first message is created. Tn means the (T+n)th > hour. Then, I created another two messages at T1 and T3 respectively. At this > moment, the .timeindex should contain two items: > T1 ---> 1 > T3 > 2 (whether it contains T0 does not matter to this problem) > Later, due to some reason, I want to insert a third message in between T1 and > T3, say T2.5, but the time index file got no changed because of the limit > that timestamp should be monotonically increasing for each segment. > After generating message with T2.5, I invoke > KafkaConsumer.offsetsForTimes("tp" -> T2.5), hoping to get the first offset > with timestamp greater or equal to T2.5 which should be the third message in > this case, but consumer returns the second message with T3. -- This message was sent by Atlassian JIRA (v6.3.4#6332)