[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-09 Thread Robert Christ (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233630#comment-15233630
 ] 

Robert Christ edited comment on KAFKA-3042 at 4/9/16 6:53 PM:
--

Hi Flavio,

The logs can be found at:

https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar

Let me know if you have trouble accessing it.  It does include the zookeeper
logs.  We had just enabled the GC logs for zookeeper but it does cover the
period of the reproduction.

As for he zkCli.sh issue, I don't have a lot of information.  I was watching 
the kafka
logs and flipped back to my zkCli.sh session and it had disconnected and needed 
to
reconnect.  I do believe the session timed out and is probably the same root 
cause 
that causes our kafka broker sessions to timeout.





was (Author: delbaeth):
Hi Flavio,

The logs can be found at:

https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar

Let me know if you have trouble accessing it.  It does include the zookeeper
logs.  We had just enabled the GC logs for zookeeper but it does cover the
period of the reproduction.

As for he zkCli.sh issue, I don't have a lot of information.  I was watching 
the kafka
logs and flipped back to my zkCli.sh session and it had disconnected and needed 
to
reconnect.  I do believe the session timed out and is probably the same root 
cause 
theat causes our kafka broker sessions to timeout.




> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-09 Thread Robert Christ (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233630#comment-15233630
 ] 

Robert Christ edited comment on KAFKA-3042 at 4/9/16 6:53 PM:
--

Hi Flavio,

The logs can be found at:

https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar

Let me know if you have trouble accessing it.  It does include the zookeeper
logs.  We had just enabled the GC logs for zookeeper but it does cover the
period of the reproduction.

As for he zkCli.sh issue, I don't have a lot of information.  I was watching 
the kafka
logs and flipped back to my zkCli.sh session and it had disconnected and needed 
to
reconnect.  I do believe the session timed out and is probably the same root 
cause 
theat causes our kafka broker sessions to timeout.





was (Author: delbaeth):
Hi Flavio,

The logs can be found at:

https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar

Let me know if you have trouble accessing it.  It does include the zookeeper
logs.  We had just enabled the GC logs for zookeeper but it does cover the
period of the reproduction.

As for he zkCli.sh issue, I don't have a lot of information.  I was watching 
the kafka
logs and flipped back to my zkCli.sh session and it had disconnected and needed 
to
reconnect.  I do believe the session timed out and is probably the same root 
cause 
the causes our kafka broker sessions to timeout.




> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KStream Close Processor

2016-04-09 Thread Guozhang Wang
Mike,

Not clear what do you mean by "buffering up the contents". Producer itself
already did some buffering and batching when sending to Kafka. Did you
actually "merge" multiple small messages into one large message before
giving it to the producer in the app code? In either case, I am not sure
how it will help the downstream consumer memory pressure issue?

About bounding the consumer memory usage, we already have some thoughts
about that issue and plan to add the memory bounding feature like the
producer does in the near future (
https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a problem
for long. And for the "max.poll.messages" config and 0.10.0, just FYI we
are shooting to have it released end of this month.

Guozhang


On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon 
wrote:

> Guozhang,
>In my processor, I'm buffering up contents of the final messages in
> order to make them larger. This is to optimize throughput and avoid tiny
> messages from being injected downstream. So nothing is being pushed to the
> producer until my configured thresholds are met in the buffering mechanism.
> So as it stands, these messages are left dangling after the producer closes
> and, even worse, if periodic commits are happening behind the scenes, the
> data is lost on restart.
>What we need is a way to notify the processors that everything is
> "about" to close so that I can properly flush what I have in memory out to
> the producer. Otherwise, I'm stuck with always sending tiny messages into
> kafka--which I know for certain causes problems on down stream consumers
> (where they set a high fetch memory size and it causes hundreds of
> thousands of messages to be retrieved at a time…and thus bogs down the
> consumer). I think the "max.poll.messages" setting we discussed before
> would help here but if it's not available until 0.10, I'm kind of stuck.
> Another option might be to disable periodic commits and only commit
> when the processor requests it. This would mitigate some data loss and is
> better than nothing. There is still a chance that data in RecordQueue not
> yet sent to my processor would be committed but never processed in this
> case.
> Another thought I had was to reduce the max fetch size; however, some
> messages can be very large (i.e. data spikes periodically). In this case,
> the messages size would exceed my lower max fetch size causing the consumer
> to simply stop consuming. So I'm stuck. So either we need to roll in the
> max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me
> know that the producer is about to close so I can clear my buffers.
> Ideas?
> Mike
>
> On Friday, April 8, 2016 8:24 PM, Guozhang Wang 
> wrote:
>
>
>  Hi Michael,
>
> When you call KafkaStreams.close(), it will first trigger a commitAll()
> function, which will 1) flush local state store if necessary; 2) flush
> messages buffered in producer; 3) commit offsets on consumer. Then it will
> close the producer / consumer clients and shutdown the tasks. So when you
> see processor's "close" function triggered, any buffered messages in the
> producer should already been flushed.
>
> Did you see a different behavior than the above described?
>
> Guozhang
>
>
> On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon  >
> wrote:
>
> > All,
> >I'm seeing my processor's "close" method being called AFTER my
> > downstream producer has been closed. I had assumed that on close I would
> be
> > able to flush whatever I had been buffering up to send to kafka topic. In
> > other words, we've seen significant performance differences in building
> > flows with small messages and large messages in/out of kafka. So my
> > processor buffers up messages to a threshold and flushes those as a
> > composite message bundle to improve downstream processing. But if this
> > close method is called AFTER the producer has already been closed, I
> would
> > have no way to actually flush the final composite bundles to my topic on
> > shutdown. Is there some way to get a call BEFORE producer shutdown
> occurs?
> > Mike
> >
> >
>
>
> --
> -- Guozhang
>
>
>
>



-- 
-- Guozhang


Re: [VOTE] KIP-4 Metadata Schema

2016-04-09 Thread Guozhang Wang
Sounds good.

On Fri, Apr 8, 2016 at 11:37 AM, Grant Henke  wrote:

> Guozhang,
>
> I agree there is a gap. Thats what I was trying to say in the last email.
> But I also, don't see a great/safe way to fix it by changing what topics
> are included in the metadata.
>
> Perhaps instead, I can add a special error code to the CreateTopic request
> to tell the user the topic they are trying to create is being deleted. I do
> think changes/improvements to the delete logic is needed. What I am saying
> here is that I don't think this metadata change is the time or place to fix
> it given the current constraints. I am flexible on that though if people
> disagree.
>
> Thank you,
> Grant
>
> On Fri, Apr 8, 2016 at 12:51 PM, Guozhang Wang  wrote:
>
> > I feel that "a delete and then create action may fail with a topic exists
> > exception...which the user could retry until succeeded" has some flaw,
> > since we cannot distinguish the case 1) the topic not marked for deleted,
> > but deletion is not complete, from 2) the topic is being created, but
> > creation is not complete. Or do I miss something here?
> >
> > Guozhang
> >
> > On Thu, Apr 7, 2016 at 11:07 AM, Grant Henke 
> wrote:
> >
> > > Thanks for the feedback Guozhang and Gwen.
> > >
> > > Gwen, I agree with you on this. I am not sure its something we
> can/should
> > > tackle here. Especially before the release. I can leave the delete flag
> > off
> > > of the changes.
> > >
> > > What that means for KIP-4, is that a client won't be able to
> > differentiate
> > > between a topic that is gone vs marked for deletion. This means a
> delete
> > > and then create action may fail with a topic exists exception...which
> the
> > > user could retry until succeeded. I think that is reasonable, and much
> > > safer.
> > >
> > > After that we can work on creating more tests and improving the delete
> > > behavior.
> > >
> > >
> > >
> > > On Thu, Apr 7, 2016 at 12:55 PM, Gwen Shapira 
> wrote:
> > >
> > > > Given that we are very close to the release, if we are changing the
> > > > Metadata cache + topic deletion logic, I'd like a good number of
> system
> > > > tests to appear with the patch.
> > > >
> > > > On Thu, Apr 7, 2016 at 10:53 AM, Gwen Shapira 
> > wrote:
> > > >
> > > > > This will change some logic though, right?
> > > > >
> > > > > IIRC, right now produce/fetch requests to marked-for-deletion
> topics
> > > fail
> > > > > because the topics are simple not around. You get a generic
> "doesn't
> > > > exist"
> > > > > error. If we keep these topics and add a flag, we'll need to find
> all
> > > the
> > > > > places with this implicit logic and correct for it.
> > > > >
> > > > > And since our tests for topic deletion are clearly inadequate...
> I'm
> > a
> > > > bit
> > > > > scared :)
> > > > >
> > > > > Gwen
> > > > >
> > > > > On Thu, Apr 7, 2016 at 10:34 AM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > >> Hmm, I think since in the original protocol, metadata response do
> > not
> > > > have
> > > > >> information for "marked for deleted topics" and hence we want to
> > > remove
> > > > >> that topic from returning in response by cleaning the metadata
> cache
> > > > once
> > > > >> it is marked to deletion.
> > > > >>
> > > > >> With the new format, I think it is OK to delay the metadata
> > cleaning.
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >> On Thu, Apr 7, 2016 at 8:35 AM, Grant Henke 
> > > > wrote:
> > > > >>
> > > > >> > I am testing the marked for deletion flag in the metadata and
> ran
> > > into
> > > > >> some
> > > > >> > challenges.
> > > > >> >
> > > > >> > It turns out that as soon as a topic is marked for deletion it
> may
> > > be
> > > > >> > purged from the metadata cache. This means that Metadata
> responses
> > > > >> > can't/don't return the topic. Though the topic may still exist
> if
> > > its
> > > > >> not
> > > > >> > ready to be completely deleted or is in the process of being
> > > deleted.
> > > > >> >
> > > > >> > This poses a challenge because a user would have no way to tell
> > if a
> > > > >> topic
> > > > >> > still exists, and is marked for deletion, other than to try and
> > > > >> recreate it
> > > > >> > and see a failure. I could change the logic to no longer purge a
> > > > message
> > > > >> > from the cache until its completely deleted, but I am not sure
> if
> > > that
> > > > >> > would impact the clients in some way negatively. Does anyone
> have
> > > > enough
> > > > >> > background to say?
> > > > >> >
> > > > >> > I will dig into this a bit more today, but wanted to throw this
> > out
> > > > >> there
> > > > >> > for some early feedback.
> > > > >> >
> > > > >> > Thank you,
> > > > >> > Grant
> > > > >> >
> > > > >> >
> > > > >> > On Tue, Apr 5, 2016 at 8:02 PM, Jun Rao 
> wrote:
> > > > >> >
> > > > >> > > 5. You will return no 

Re: [VOTE] KIP-4 Metadata Schema (Round 2)

2016-04-09 Thread Guozhang Wang
+1

On Fri, Apr 8, 2016 at 4:36 PM, Gwen Shapira  wrote:

> +1
>
> On Fri, Apr 8, 2016 at 2:41 PM, Grant Henke  wrote:
>
> > I would like to re-initiate the voting process for the "KIP-4 Metadata
> > Schema changes". This is not a vote for all of KIP-4, but specifically
> for
> > the metadata changes. I have included the exact changes below for
> clarity:
> > >
> > > Metadata Request (version 1)
> > >
> > >
> > >
> > > MetadataRequest => [topics]
> > >
> > > Stays the same as version 0 however behavior changes.
> > > In version 0 there was no way to request no topics, and and empty list
> > > signified all topics.
> > > In version 1 a null topics list (size -1 on the wire) will indicate
> that
> > a
> > > user wants *ALL* topic metadata. Compared to an empty list (size 0)
> which
> > > indicates metadata for *NO* topics should be returned.
> > > Metadata Response (version 1)
> > >
> > >
> > >
> > > MetadataResponse => [brokers] controllerId [topic_metadata]
> > >   brokers => node_id host port rack
> > > node_id => INT32
> > > host => STRING
> > > port => INT32
> > > rack => NULLABLE_STRING
> > >   controllerId => INT32
> > >   topic_metadata => topic_error_code topic is_internal
> > [partition_metadata]
> > > topic_error_code => INT16
> > > topic => STRING
> > > is_internal => BOOLEAN
> > > partition_metadata => partition_error_code partition_id leader
> > [replicas] [isr]
> > >   partition_error_code => INT16
> > >   partition_id => INT32
> > >   leader => INT32
> > >   replicas => INT32
> > >   isr => INT32
> > >
> > > Adds rack, controller_id, and is_internal to the version 0 response.
> > >
> >
> > The KIP is available here for reference (linked to the Metadata schema
> > section):
> > *
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-MetadataSchema
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-MetadataSchema
> > >*
> >
> > A pull request is available implementing the proposed changes here:
> > https://github.com/apache/kafka/pull/1095
> >
> > Here are some links to past discussions on the mailing list:
> > http://search-hadoop.com/m/uyzND1pd4T52H1m0u1=Re+KIP+4+Wiki+Update
> >
> >
> http://search-hadoop.com/m/uyzND1J2IXeSNXAT=Metadata+and+ACLs+wire+protocol+review+KIP+4+
> >
> > Here is the previous vote discussion (please take a look and discuss
> > there):
> >
> >
> http://search-hadoop.com/m/uyzND1xlaiU10QlYX=+VOTE+KIP+4+Metadata+Schema
> >
> > Thank you,
> > Grant
> > --
> > Grant Henke
> > Software Engineer | Cloudera
> > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> >
>



-- 
-- Guozhang


[jira] [Updated] (KAFKA-3521) Better handling NPEs in Streams DSL implementation

2016-04-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3521:
-
Fix Version/s: (was: 0.10.1.0)
   0.10.0.0

> Better handling NPEs in Streams DSL implementation
> --
>
> Key: KAFKA-3521
> URL: https://issues.apache.org/jira/browse/KAFKA-3521
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.0.0
>
>
> We observer a few cases where a mal-programmed application would trigger NPE 
> thrown from some lower-level classes, where they should really been validated 
> with more meaningful exceptions being thrown if the validation fails.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: MINOR: Update protocol doc link in Introductio...

2016-04-09 Thread SinghAsDev
GitHub user SinghAsDev opened a pull request:

https://github.com/apache/kafka/pull/1211

MINOR: Update protocol doc link in Introduction.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/SinghAsDev/kafka MinorFixDocLink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1211


commit eee578a929a2d7841aa1f434ece885b8e7cf5a5f
Author: Ashish Singh 
Date:   2016-04-09T17:39:54Z

MINOR: Update protocol doc link in Introduction.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-09 Thread Robert Christ (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233630#comment-15233630
 ] 

Robert Christ commented on KAFKA-3042:
--

Hi Flavio,

The logs can be found at:

https://s3-us-west-2.amazonaws.com/tivo-confluent/confluent.tar

Let me know if you have trouble accessing it.  It does include the zookeeper
logs.  We had just enabled the GC logs for zookeeper but it does cover the
period of the reproduction.

As for he zkCli.sh issue, I don't have a lot of information.  I was watching 
the kafka
logs and flipped back to my zkCli.sh session and it had disconnected and needed 
to
reconnect.  I do believe the session timed out and is probably the same root 
cause 
the causes our kafka broker sessions to timeout.




> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-09 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233564#comment-15233564
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

[~delbaeth] thanks for all the information.

bq.  Broker 1 rolled correctly and rejoined and leadership rebalancing 
occurred. After broker 2 rolled and came back up it now has an inconsistent 
view of the metadata. It thinks there are only 300 topics and all the other 
brokers believe there are 700. Should we file this as a separate issue?

I'm not aware of this issue, so it does sound better to report it in a 
different jira and describe the problem in as much detail as possible. If you 
have logs for this problem, then please share.

bq. We have managed to reproduce the problem and have a snapshot of the logs. 
The tarball is about a gigabyte. What should I do with it?

If you have a web server that can host it, then perhaps you can upload it 
there. A dropbox/box/onedrive public folder that we can read from would also do 
it.

bq. my zkCli.sh session which I was using to watch the controller exited here 
so I was disconnected for a minute

This is odd. Could you describe in more detail what happened with the zkCli 
session? You said that it disconnected for a minute, but has the session 
expired? It must have expired, unless your session timeout was at least one 
minute, which according to your description, it wasn't. If you have them, 
please include the zk logs in the bundle.



  

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-09 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232535#comment-15232535
 ] 

Flavio Junqueira edited comment on KAFKA-3042 at 4/9/16 1:09 PM:
-

Here is what I've been able to find out so far based on the logs that 
[~wushujames] posted: 

# The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. 
The {{updatedIsr}} in the logs seem to be called from 
{{Partition.maybeShrinkIsr}}.
# {{Partition.maybeShrinkIsr}} is called from 
{{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to 
schedule call in {{ReplicaManager.startup}}. This is the main reason we see 
those messages periodically coming up.
# In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader 
replica is the broker itself, which is determined by the variable 
{{leaderReplicaIdOpt}}.

It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is 
possible that it is due to a race with either the controllers or the execution 
of {{LeaderAndIsr}} requests.



was (Author: fpj):
Here is what I've been able to find out so far based on the logs that 
[~wushujames] posted: 

# The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. 
The {{updatedIsr}} in the logs seem to be called from 
{{Partition.maybeShrinkIsr}}.
# {{Partition.maybeShrinkIsr}} is called from 
{{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to 
schedule call in {{ReplicaManager.startup}}. This is the main reason we see 
those messages periodically coming up.
# In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader 
replica is the broker itself, which is determined by the variable 
{{leaderReplicaIdOpt}.

It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is 
possible that it is due to a race with either the controllers or the execution 
of {{LeaderAndIsr} requests.


> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KStream Close Processor

2016-04-09 Thread Michael D. Coon
Guozhang,
   In my processor, I'm buffering up contents of the final messages in order to 
make them larger. This is to optimize throughput and avoid tiny messages from 
being injected downstream. So nothing is being pushed to the producer until my 
configured thresholds are met in the buffering mechanism. So as it stands, 
these messages are left dangling after the producer closes and, even worse, if 
periodic commits are happening behind the scenes, the data is lost on restart.
   What we need is a way to notify the processors that everything is "about" to 
close so that I can properly flush what I have in memory out to the producer. 
Otherwise, I'm stuck with always sending tiny messages into kafka--which I know 
for certain causes problems on down stream consumers (where they set a high 
fetch memory size and it causes hundreds of thousands of messages to be 
retrieved at a time…and thus bogs down the consumer). I think the 
"max.poll.messages" setting we discussed before would help here but if it's not 
available until 0.10, I'm kind of stuck.
    Another option might be to disable periodic commits and only commit when 
the processor requests it. This would mitigate some data loss and is better 
than nothing. There is still a chance that data in RecordQueue not yet sent to 
my processor would be committed but never processed in this case.
    Another thought I had was to reduce the max fetch size; however, some 
messages can be very large (i.e. data spikes periodically). In this case, the 
messages size would exceed my lower max fetch size causing the consumer to 
simply stop consuming. So I'm stuck. So either we need to roll in the 
max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me 
know that the producer is about to close so I can clear my buffers. 
    Ideas?
Mike

On Friday, April 8, 2016 8:24 PM, Guozhang Wang  wrote:
 

 Hi Michael,

When you call KafkaStreams.close(), it will first trigger a commitAll()
function, which will 1) flush local state store if necessary; 2) flush
messages buffered in producer; 3) commit offsets on consumer. Then it will
close the producer / consumer clients and shutdown the tasks. So when you
see processor's "close" function triggered, any buffered messages in the
producer should already been flushed.

Did you see a different behavior than the above described?

Guozhang


On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon 
wrote:

> All,
>    I'm seeing my processor's "close" method being called AFTER my
> downstream producer has been closed. I had assumed that on close I would be
> able to flush whatever I had been buffering up to send to kafka topic. In
> other words, we've seen significant performance differences in building
> flows with small messages and large messages in/out of kafka. So my
> processor buffers up messages to a threshold and flushes those as a
> composite message bundle to improve downstream processing. But if this
> close method is called AFTER the producer has already been closed, I would
> have no way to actually flush the final composite bundles to my topic on
> shutdown. Is there some way to get a call BEFORE producer shutdown occurs?
> Mike
>
>


-- 
-- Guozhang


  

Re: kafka-connector sink task flush() interval

2016-04-09 Thread Liquan Pei
The flush interval is controlled by offset.flush.interval.ms, which is
the Interval at which to try committing offsets for tasks. You can find the
documentation for this config at
http://docs.confluent.io/2.0.1/connect/userguide.html#configuring-workers

-Liquan

On Fri, Apr 8, 2016 at 8:19 PM, victory_zgh  wrote:

> Hi,
> Recently, I am working on kafka connector  implement. According to the
> connector sink task document, the connect platform invoke flush method
> periodically, But the doc does not metition the flush interval.
> I don't know the flush interval default value or Where we can set this
> interval??
>
> Best Regards
> guanghui.zhu
>
> 2016-04-09
>
>
>
> Guanghui.Zhu
> Department of Computer Science and Technology
> State Key Laboratory for Novel Software Technology
> Nanjing University
> Phone: +86 13770681551
> Email: victory_...@163.com
> ResearchLab Homepage: http://pasa-bigdata.nju.edu.cn




-- 
Liquan Pei
Software Engineer, Confluent Inc


[GitHub] kafka pull request: MINOR: fix incorrect exception message

2016-04-09 Thread stepio
GitHub user stepio opened a pull request:

https://github.com/apache/kafka/pull/1210

MINOR: fix incorrect exception message

While playing with client got the next exception:
```java
java.lang.IllegalArgumentException: Invalid partition given with record: 1 
is not in the range [0...1].
```
It's obviously incorrect, so I've fixed it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stepio/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1210


commit c9f5f91be4389c58cbafb0d1403e901afcc57dd7
Author: Igor Stepanov 
Date:   2016-04-09T06:33:54Z

MINOR: fix validation for the manually provided partition number, incorrect 
exception message

commit c935890274246bf90848e0a8eba7dc4ae1a81035
Author: Igor Stepanov 
Date:   2016-04-09T07:05:30Z

MINOR: slight improvement for the exception message generating




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---