[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233630#comment-15233630 ] Robert Christ edited comment on KAFKA-3042 at 4/9/16 6:53 PM: -- Hi Flavio, The logs can be found at: https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar Let me know if you have trouble accessing it. It does include the zookeeper logs. We had just enabled the GC logs for zookeeper but it does cover the period of the reproduction. As for he zkCli.sh issue, I don't have a lot of information. I was watching the kafka logs and flipped back to my zkCli.sh session and it had disconnected and needed to reconnect. I do believe the session timed out and is probably the same root cause that causes our kafka broker sessions to timeout. was (Author: delbaeth): Hi Flavio, The logs can be found at: https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar Let me know if you have trouble accessing it. It does include the zookeeper logs. We had just enabled the GC logs for zookeeper but it does cover the period of the reproduction. As for he zkCli.sh issue, I don't have a lot of information. I was watching the kafka logs and flipped back to my zkCli.sh session and it had disconnected and needed to reconnect. I do believe the session timed out and is probably the same root cause theat causes our kafka broker sessions to timeout. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233630#comment-15233630 ] Robert Christ edited comment on KAFKA-3042 at 4/9/16 6:53 PM: -- Hi Flavio, The logs can be found at: https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar Let me know if you have trouble accessing it. It does include the zookeeper logs. We had just enabled the GC logs for zookeeper but it does cover the period of the reproduction. As for he zkCli.sh issue, I don't have a lot of information. I was watching the kafka logs and flipped back to my zkCli.sh session and it had disconnected and needed to reconnect. I do believe the session timed out and is probably the same root cause theat causes our kafka broker sessions to timeout. was (Author: delbaeth): Hi Flavio, The logs can be found at: https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar Let me know if you have trouble accessing it. It does include the zookeeper logs. We had just enabled the GC logs for zookeeper but it does cover the period of the reproduction. As for he zkCli.sh issue, I don't have a lot of information. I was watching the kafka logs and flipped back to my zkCli.sh session and it had disconnected and needed to reconnect. I do believe the session timed out and is probably the same root cause the causes our kafka broker sessions to timeout. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KStream Close Processor
Mike, Not clear what do you mean by "buffering up the contents". Producer itself already did some buffering and batching when sending to Kafka. Did you actually "merge" multiple small messages into one large message before giving it to the producer in the app code? In either case, I am not sure how it will help the downstream consumer memory pressure issue? About bounding the consumer memory usage, we already have some thoughts about that issue and plan to add the memory bounding feature like the producer does in the near future ( https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a problem for long. And for the "max.poll.messages" config and 0.10.0, just FYI we are shooting to have it released end of this month. Guozhang On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coonwrote: > Guozhang, >In my processor, I'm buffering up contents of the final messages in > order to make them larger. This is to optimize throughput and avoid tiny > messages from being injected downstream. So nothing is being pushed to the > producer until my configured thresholds are met in the buffering mechanism. > So as it stands, these messages are left dangling after the producer closes > and, even worse, if periodic commits are happening behind the scenes, the > data is lost on restart. >What we need is a way to notify the processors that everything is > "about" to close so that I can properly flush what I have in memory out to > the producer. Otherwise, I'm stuck with always sending tiny messages into > kafka--which I know for certain causes problems on down stream consumers > (where they set a high fetch memory size and it causes hundreds of > thousands of messages to be retrieved at a time…and thus bogs down the > consumer). I think the "max.poll.messages" setting we discussed before > would help here but if it's not available until 0.10, I'm kind of stuck. > Another option might be to disable periodic commits and only commit > when the processor requests it. This would mitigate some data loss and is > better than nothing. There is still a chance that data in RecordQueue not > yet sent to my processor would be committed but never processed in this > case. > Another thought I had was to reduce the max fetch size; however, some > messages can be very large (i.e. data spikes periodically). In this case, > the messages size would exceed my lower max fetch size causing the consumer > to simply stop consuming. So I'm stuck. So either we need to roll in the > max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me > know that the producer is about to close so I can clear my buffers. > Ideas? > Mike > > On Friday, April 8, 2016 8:24 PM, Guozhang Wang > wrote: > > > Hi Michael, > > When you call KafkaStreams.close(), it will first trigger a commitAll() > function, which will 1) flush local state store if necessary; 2) flush > messages buffered in producer; 3) commit offsets on consumer. Then it will > close the producer / consumer clients and shutdown the tasks. So when you > see processor's "close" function triggered, any buffered messages in the > producer should already been flushed. > > Did you see a different behavior than the above described? > > Guozhang > > > On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon > > wrote: > > > All, > >I'm seeing my processor's "close" method being called AFTER my > > downstream producer has been closed. I had assumed that on close I would > be > > able to flush whatever I had been buffering up to send to kafka topic. In > > other words, we've seen significant performance differences in building > > flows with small messages and large messages in/out of kafka. So my > > processor buffers up messages to a threshold and flushes those as a > > composite message bundle to improve downstream processing. But if this > > close method is called AFTER the producer has already been closed, I > would > > have no way to actually flush the final composite bundles to my topic on > > shutdown. Is there some way to get a call BEFORE producer shutdown > occurs? > > Mike > > > > > > > -- > -- Guozhang > > > > -- -- Guozhang
Re: [VOTE] KIP-4 Metadata Schema
Sounds good. On Fri, Apr 8, 2016 at 11:37 AM, Grant Henkewrote: > Guozhang, > > I agree there is a gap. Thats what I was trying to say in the last email. > But I also, don't see a great/safe way to fix it by changing what topics > are included in the metadata. > > Perhaps instead, I can add a special error code to the CreateTopic request > to tell the user the topic they are trying to create is being deleted. I do > think changes/improvements to the delete logic is needed. What I am saying > here is that I don't think this metadata change is the time or place to fix > it given the current constraints. I am flexible on that though if people > disagree. > > Thank you, > Grant > > On Fri, Apr 8, 2016 at 12:51 PM, Guozhang Wang wrote: > > > I feel that "a delete and then create action may fail with a topic exists > > exception...which the user could retry until succeeded" has some flaw, > > since we cannot distinguish the case 1) the topic not marked for deleted, > > but deletion is not complete, from 2) the topic is being created, but > > creation is not complete. Or do I miss something here? > > > > Guozhang > > > > On Thu, Apr 7, 2016 at 11:07 AM, Grant Henke > wrote: > > > > > Thanks for the feedback Guozhang and Gwen. > > > > > > Gwen, I agree with you on this. I am not sure its something we > can/should > > > tackle here. Especially before the release. I can leave the delete flag > > off > > > of the changes. > > > > > > What that means for KIP-4, is that a client won't be able to > > differentiate > > > between a topic that is gone vs marked for deletion. This means a > delete > > > and then create action may fail with a topic exists exception...which > the > > > user could retry until succeeded. I think that is reasonable, and much > > > safer. > > > > > > After that we can work on creating more tests and improving the delete > > > behavior. > > > > > > > > > > > > On Thu, Apr 7, 2016 at 12:55 PM, Gwen Shapira > wrote: > > > > > > > Given that we are very close to the release, if we are changing the > > > > Metadata cache + topic deletion logic, I'd like a good number of > system > > > > tests to appear with the patch. > > > > > > > > On Thu, Apr 7, 2016 at 10:53 AM, Gwen Shapira > > wrote: > > > > > > > > > This will change some logic though, right? > > > > > > > > > > IIRC, right now produce/fetch requests to marked-for-deletion > topics > > > fail > > > > > because the topics are simple not around. You get a generic > "doesn't > > > > exist" > > > > > error. If we keep these topics and add a flag, we'll need to find > all > > > the > > > > > places with this implicit logic and correct for it. > > > > > > > > > > And since our tests for topic deletion are clearly inadequate... > I'm > > a > > > > bit > > > > > scared :) > > > > > > > > > > Gwen > > > > > > > > > > On Thu, Apr 7, 2016 at 10:34 AM, Guozhang Wang > > > > > wrote: > > > > > > > > > >> Hmm, I think since in the original protocol, metadata response do > > not > > > > have > > > > >> information for "marked for deleted topics" and hence we want to > > > remove > > > > >> that topic from returning in response by cleaning the metadata > cache > > > > once > > > > >> it is marked to deletion. > > > > >> > > > > >> With the new format, I think it is OK to delay the metadata > > cleaning. > > > > >> > > > > >> Guozhang > > > > >> > > > > >> On Thu, Apr 7, 2016 at 8:35 AM, Grant Henke > > > > wrote: > > > > >> > > > > >> > I am testing the marked for deletion flag in the metadata and > ran > > > into > > > > >> some > > > > >> > challenges. > > > > >> > > > > > >> > It turns out that as soon as a topic is marked for deletion it > may > > > be > > > > >> > purged from the metadata cache. This means that Metadata > responses > > > > >> > can't/don't return the topic. Though the topic may still exist > if > > > its > > > > >> not > > > > >> > ready to be completely deleted or is in the process of being > > > deleted. > > > > >> > > > > > >> > This poses a challenge because a user would have no way to tell > > if a > > > > >> topic > > > > >> > still exists, and is marked for deletion, other than to try and > > > > >> recreate it > > > > >> > and see a failure. I could change the logic to no longer purge a > > > > message > > > > >> > from the cache until its completely deleted, but I am not sure > if > > > that > > > > >> > would impact the clients in some way negatively. Does anyone > have > > > > enough > > > > >> > background to say? > > > > >> > > > > > >> > I will dig into this a bit more today, but wanted to throw this > > out > > > > >> there > > > > >> > for some early feedback. > > > > >> > > > > > >> > Thank you, > > > > >> > Grant > > > > >> > > > > > >> > > > > > >> > On Tue, Apr 5, 2016 at 8:02 PM, Jun Rao > wrote: > > > > >> > > > > > >> > > 5. You will return no
Re: [VOTE] KIP-4 Metadata Schema (Round 2)
+1 On Fri, Apr 8, 2016 at 4:36 PM, Gwen Shapirawrote: > +1 > > On Fri, Apr 8, 2016 at 2:41 PM, Grant Henke wrote: > > > I would like to re-initiate the voting process for the "KIP-4 Metadata > > Schema changes". This is not a vote for all of KIP-4, but specifically > for > > the metadata changes. I have included the exact changes below for > clarity: > > > > > > Metadata Request (version 1) > > > > > > > > > > > > MetadataRequest => [topics] > > > > > > Stays the same as version 0 however behavior changes. > > > In version 0 there was no way to request no topics, and and empty list > > > signified all topics. > > > In version 1 a null topics list (size -1 on the wire) will indicate > that > > a > > > user wants *ALL* topic metadata. Compared to an empty list (size 0) > which > > > indicates metadata for *NO* topics should be returned. > > > Metadata Response (version 1) > > > > > > > > > > > > MetadataResponse => [brokers] controllerId [topic_metadata] > > > brokers => node_id host port rack > > > node_id => INT32 > > > host => STRING > > > port => INT32 > > > rack => NULLABLE_STRING > > > controllerId => INT32 > > > topic_metadata => topic_error_code topic is_internal > > [partition_metadata] > > > topic_error_code => INT16 > > > topic => STRING > > > is_internal => BOOLEAN > > > partition_metadata => partition_error_code partition_id leader > > [replicas] [isr] > > > partition_error_code => INT16 > > > partition_id => INT32 > > > leader => INT32 > > > replicas => INT32 > > > isr => INT32 > > > > > > Adds rack, controller_id, and is_internal to the version 0 response. > > > > > > > The KIP is available here for reference (linked to the Metadata schema > > section): > > * > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-MetadataSchema > > < > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-MetadataSchema > > >* > > > > A pull request is available implementing the proposed changes here: > > https://github.com/apache/kafka/pull/1095 > > > > Here are some links to past discussions on the mailing list: > > http://search-hadoop.com/m/uyzND1pd4T52H1m0u1=Re+KIP+4+Wiki+Update > > > > > http://search-hadoop.com/m/uyzND1J2IXeSNXAT=Metadata+and+ACLs+wire+protocol+review+KIP+4+ > > > > Here is the previous vote discussion (please take a look and discuss > > there): > > > > > http://search-hadoop.com/m/uyzND1xlaiU10QlYX=+VOTE+KIP+4+Metadata+Schema > > > > Thank you, > > Grant > > -- > > Grant Henke > > Software Engineer | Cloudera > > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke > > > -- -- Guozhang
[jira] [Updated] (KAFKA-3521) Better handling NPEs in Streams DSL implementation
[ https://issues.apache.org/jira/browse/KAFKA-3521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-3521: - Fix Version/s: (was: 0.10.1.0) 0.10.0.0 > Better handling NPEs in Streams DSL implementation > -- > > Key: KAFKA-3521 > URL: https://issues.apache.org/jira/browse/KAFKA-3521 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Labels: user-experience > Fix For: 0.10.0.0 > > > We observer a few cases where a mal-programmed application would trigger NPE > thrown from some lower-level classes, where they should really been validated > with more meaningful exceptions being thrown if the validation fails. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: MINOR: Update protocol doc link in Introductio...
GitHub user SinghAsDev opened a pull request: https://github.com/apache/kafka/pull/1211 MINOR: Update protocol doc link in Introduction. You can merge this pull request into a Git repository by running: $ git pull https://github.com/SinghAsDev/kafka MinorFixDocLink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1211.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1211 commit eee578a929a2d7841aa1f434ece885b8e7cf5a5f Author: Ashish SinghDate: 2016-04-09T17:39:54Z MINOR: Update protocol doc link in Introduction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233630#comment-15233630 ] Robert Christ commented on KAFKA-3042: -- Hi Flavio, The logs can be found at: https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar Let me know if you have trouble accessing it. It does include the zookeeper logs. We had just enabled the GC logs for zookeeper but it does cover the period of the reproduction. As for he zkCli.sh issue, I don't have a lot of information. I was watching the kafka logs and flipped back to my zkCli.sh session and it had disconnected and needed to reconnect. I do believe the session timed out and is probably the same root cause the causes our kafka broker sessions to timeout. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233564#comment-15233564 ] Flavio Junqueira commented on KAFKA-3042: - [~delbaeth] thanks for all the information. bq. Broker 1 rolled correctly and rejoined and leadership rebalancing occurred. After broker 2 rolled and came back up it now has an inconsistent view of the metadata. It thinks there are only 300 topics and all the other brokers believe there are 700. Should we file this as a separate issue? I'm not aware of this issue, so it does sound better to report it in a different jira and describe the problem in as much detail as possible. If you have logs for this problem, then please share. bq. We have managed to reproduce the problem and have a snapshot of the logs. The tarball is about a gigabyte. What should I do with it? If you have a web server that can host it, then perhaps you can upload it there. A dropbox/box/onedrive public folder that we can read from would also do it. bq. my zkCli.sh session which I was using to watch the controller exited here so I was disconnected for a minute This is odd. Could you describe in more detail what happened with the zkCli session? You said that it disconnected for a minute, but has the session expired? It must have expired, unless your session timeout was at least one minute, which according to your description, it wasn't. If you have them, please include the zk logs in the bundle. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232535#comment-15232535 ] Flavio Junqueira edited comment on KAFKA-3042 at 4/9/16 1:09 PM: - Here is what I've been able to find out so far based on the logs that [~wushujames] posted: # The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. The {{updatedIsr}} in the logs seem to be called from {{Partition.maybeShrinkIsr}}. # {{Partition.maybeShrinkIsr}} is called from {{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to schedule call in {{ReplicaManager.startup}}. This is the main reason we see those messages periodically coming up. # In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader replica is the broker itself, which is determined by the variable {{leaderReplicaIdOpt}}. It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is possible that it is due to a race with either the controllers or the execution of {{LeaderAndIsr}} requests. was (Author: fpj): Here is what I've been able to find out so far based on the logs that [~wushujames] posted: # The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. The {{updatedIsr}} in the logs seem to be called from {{Partition.maybeShrinkIsr}}. # {{Partition.maybeShrinkIsr}} is called from {{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to schedule call in {{ReplicaManager.startup}}. This is the main reason we see those messages periodically coming up. # In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader replica is the broker itself, which is determined by the variable {{leaderReplicaIdOpt}. It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is possible that it is due to a race with either the controllers or the execution of {{LeaderAndIsr} requests. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KStream Close Processor
Guozhang, In my processor, I'm buffering up contents of the final messages in order to make them larger. This is to optimize throughput and avoid tiny messages from being injected downstream. So nothing is being pushed to the producer until my configured thresholds are met in the buffering mechanism. So as it stands, these messages are left dangling after the producer closes and, even worse, if periodic commits are happening behind the scenes, the data is lost on restart. What we need is a way to notify the processors that everything is "about" to close so that I can properly flush what I have in memory out to the producer. Otherwise, I'm stuck with always sending tiny messages into kafka--which I know for certain causes problems on down stream consumers (where they set a high fetch memory size and it causes hundreds of thousands of messages to be retrieved at a time…and thus bogs down the consumer). I think the "max.poll.messages" setting we discussed before would help here but if it's not available until 0.10, I'm kind of stuck. Another option might be to disable periodic commits and only commit when the processor requests it. This would mitigate some data loss and is better than nothing. There is still a chance that data in RecordQueue not yet sent to my processor would be committed but never processed in this case. Another thought I had was to reduce the max fetch size; however, some messages can be very large (i.e. data spikes periodically). In this case, the messages size would exceed my lower max fetch size causing the consumer to simply stop consuming. So I'm stuck. So either we need to roll in the max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me know that the producer is about to close so I can clear my buffers. Ideas? Mike On Friday, April 8, 2016 8:24 PM, Guozhang Wangwrote: Hi Michael, When you call KafkaStreams.close(), it will first trigger a commitAll() function, which will 1) flush local state store if necessary; 2) flush messages buffered in producer; 3) commit offsets on consumer. Then it will close the producer / consumer clients and shutdown the tasks. So when you see processor's "close" function triggered, any buffered messages in the producer should already been flushed. Did you see a different behavior than the above described? Guozhang On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon wrote: > All, > I'm seeing my processor's "close" method being called AFTER my > downstream producer has been closed. I had assumed that on close I would be > able to flush whatever I had been buffering up to send to kafka topic. In > other words, we've seen significant performance differences in building > flows with small messages and large messages in/out of kafka. So my > processor buffers up messages to a threshold and flushes those as a > composite message bundle to improve downstream processing. But if this > close method is called AFTER the producer has already been closed, I would > have no way to actually flush the final composite bundles to my topic on > shutdown. Is there some way to get a call BEFORE producer shutdown occurs? > Mike > > -- -- Guozhang
Re: kafka-connector sink task flush() interval
The flush interval is controlled by offset.flush.interval.ms, which is the Interval at which to try committing offsets for tasks. You can find the documentation for this config at http://docs.confluent.io/2.0.1/connect/userguide.html#configuring-workers -Liquan On Fri, Apr 8, 2016 at 8:19 PM, victory_zghwrote: > Hi, > Recently, I am working on kafka connector implement. According to the > connector sink task document, the connect platform invoke flush method > periodically, But the doc does not metition the flush interval. > I don't know the flush interval default value or Where we can set this > interval?? > > Best Regards > guanghui.zhu > > 2016-04-09 > > > > Guanghui.Zhu > Department of Computer Science and Technology > State Key Laboratory for Novel Software Technology > Nanjing University > Phone: +86 13770681551 > Email: victory_...@163.com > ResearchLab Homepage: http://pasa-bigdata.nju.edu.cn -- Liquan Pei Software Engineer, Confluent Inc
[GitHub] kafka pull request: MINOR: fix incorrect exception message
GitHub user stepio opened a pull request: https://github.com/apache/kafka/pull/1210 MINOR: fix incorrect exception message While playing with client got the next exception: ```java java.lang.IllegalArgumentException: Invalid partition given with record: 1 is not in the range [0...1]. ``` It's obviously incorrect, so I've fixed it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/stepio/kafka trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1210.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1210 commit c9f5f91be4389c58cbafb0d1403e901afcc57dd7 Author: Igor StepanovDate: 2016-04-09T06:33:54Z MINOR: fix validation for the manually provided partition number, incorrect exception message commit c935890274246bf90848e0a8eba7dc4ae1a81035 Author: Igor Stepanov Date: 2016-04-09T07:05:30Z MINOR: slight improvement for the exception message generating --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---