Re: Strange behavior when turn the system clock back

2016-08-10 Thread Gwen Shapira
I know it sounds silly, but did you check that your test setup works
when you don't change the clock?

This pattern can happen when two consumers somehow block each other
(for example, one thread with two consumers) - so one waits for the
other to join, but the other is blocked, so the first is timed out and
then the second is unblocked and manages to join but now the first is
blocked and so on...

Gwen

On Wed, Aug 10, 2016 at 10:29 AM, Gabriel Ibarra
 wrote:
> Hello guys, I am dealing with an issue when turn the system clock back
> (either due to NTP or administrator action). I'm using kafka_2.11-0.10.0.0
>
> I follow the next steps.
> - Start a consumer for TOPIC_NAME with group id GROUP_NAME. It will be
> owner of all the partitions.
> - Turn the system clock back. For instance 1 hour.
> - Start a new consumer for TOPIC_NAME  using the same group id, it will
> force a rebalance.
>
> After these actions the kafka server logs constantly the below
> messages, and after
> a while both consumers do not receive more packages. I saw that this
> condition lasts at least the time that the clock went back, for this
> example 1 hour, and finally after this time kafka come back to work.
>
> [2016-08-08 11:30:23,023] INFO [GroupCoordinator 0]: Preparing to
> restabilize group GROUP_NAME with old generation 2 (kafka.coordinator.
> GroupCoordinator)
> [2016-08-08 11:30:23,025] INFO [GroupCoordinator 0]: Stabilized group
> GROUP_NAME generation 3 (kafka.coordinator.GroupCoordinator)
> [2016-08-08 11:30:23,027] INFO [GroupCoordinator 0]: Preparing to
> restabilize group GROUP_NAME with old generation 3 (kafka.coordinator.
> GroupCoordinator)
> [2016-08-08 11:30:23,029] INFO [GroupCoordinator 0]: Group GROUP_NAME
> generation 3 is dead and removed (kafka.coordinator.GroupCoordinator)
> [2016-08-08 11:30:23,032] INFO [GroupCoordinator 0]: Preparing to
> restabilize group GROUP_NAME with old generation 0 (kafka.coordinator.
> GroupCoordinator)
> [2016-08-08 11:30:23,032] INFO [GroupCoordinator 0]: Stabilized group
> GROUP_NAME generation 1 (kafka.coordinator.GroupCoordinator)
> [2016-08-08 11:30:23,033] INFO [GroupCoordinator 0]: Preparing to
> restabilize group GROUP_NAME with old generation 1 (kafka.coordinator.
> GroupCoordinator)
> [2016-08-08 11:30:23,034] INFO [GroupCoordinator 0]: Group GROUP generation
> 1 is dead and removed (kafka.coordinator.GroupCoordinator)
> [2016-08-08 11:30:23,043] INFO [GroupCoordinator 0]: Preparing to
> restabilize group GROUP_NAME with old generation 0 (kafka.coordinator.
> GroupCoordinator)
> [2016-08-08 11:30:23,044] INFO [GroupCoordinator 0]: Stabilized group
> GROUP_NAME generation 1 (kafka.coordinator.GroupCoordinator)
> [2016-08-08 11:30:23,044] INFO [GroupCoordinator 0]: Preparing to
> restabilize group GROUP_NAME with old generation 1 (kafka.coordinator.
> GroupCoordinator)
> [2016-08-08 11:30:23,045] INFO [GroupCoordinator 0]: Group GROUP_NAME
> generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)
>
> IMHO, I think that kafka's consumers have to work fine after any change of
> system clock, but maybe this behavior has fundamentals that I don't know.
>
> I'm sorry if it was discussed previously, I was researching but I didn't
> found a similar issue.
>
> Thanks,
>
> --
>
>
>
> Gabriel Alejandro Ibarra
>
> Software Engineer
>
> San Lorenzo 47, 3rd Floor, Office 5
>
> Córdoba, Argentina
>
> Phone: +54 351 4217888



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: Review request for KAFKA-3600

2016-08-10 Thread Gwen Shapira
I hate doing this, because Ashish has really been good about following
up on the PR, but I'm questioning the usefulness of this patch.

It adds non-trivial complexity to the client... with not much return
on the investment, as far as I can see?
When I first suggested it, it was before KIP-35 was merged and
released and the intent was to validate KIP-35 (since I have low
opinion of protocols that aren't used). Since then KIP-35 was already
released, the followup turned more complex than we expected, I think.
And I'm wondering if it is worth it.

The work and followup from Ashish is still super appreciated, but I
think we need more than appreciation - adding complexity to already
complex clients need to have functional justification...

Anyway, I was out of the loop for ages, so feel free to yell at me for
missing the obvious.

Gwen

On Tue, Aug 9, 2016 at 8:47 AM, Ashish Singh  wrote:
> Provided wrong link to PR, here is the PR
>  for KAFKA-3600.
>
> On Tue, Aug 9, 2016 at 9:45 AM, Ashish Singh  wrote:
>
>> Hey Guys,
>>
>> KAFKA-3600  was part of
>> KIP-35's proposal. KAFKA-3307
>> ,
>> adding ApiVersionsRequest/Response, was committed to 0.10.0.0, but
>> KAFKA-3600, enhancing java clients, is still under review. Here is the PR
>> 
>>
>> I have addressed all review comments and have been waiting for further
>> reviews/ this to go in for quite some time. I will really appreciate if a
>> committer can help with making progress on this.
>>
>> --
>>
>> Regards,
>> Ashish
>>
>
>
>
> --
>
> Regards,
> Ashish



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] KIP-73: Replication Quotas

2016-08-10 Thread Mayuresh Gharat
This might have been answered before.
I was wondering when the leader quota is reached and it sends empty
response ( If the inclusion of a partition, listed in the leader's
throttled-replicas list, causes the LeaderQuotaRate to be exceeded, that
partition is omitted from the response (aka returns 0 bytes).). At this
point the follower quota is NOT reached and the follower is still going to
ask for the that partition in the next fetch request. Would it be fair to
add some logic there so that the follower backs off ( for some configurable
time) from including those partitions in the next fetch request?

Thanks,

Mayuresh

On Wed, Aug 10, 2016 at 8:06 AM, Ben Stopford  wrote:

> Thanks again for the responses everyone. I’ve removed the the extra
> fetcher threads from the proposal, switching to the inclusion-based
> approach. The relevant section is:
>
> The follower makes a requests, using the fixed size of
> replica.fetch.response.max.bytes as per KIP-74  confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes>.
> The order of the partitions in the fetch request are randomised to ensure
> fairness.
> When the leader receives the fetch request it processes the partitions in
> the defined order, up to the response's size limit. If the inclusion of a
> partition, listed in the leader's throttled-replicas list, causes the
> LeaderQuotaRate to be exceeded, that partition is omitted from the response
> (aka returns 0 bytes). Logically, this is of the form:
> var bytesAllowedForThrottledPartition = quota.recordAndMaybeAdjust(
> bytesRequestedForPartition)
> When the follower receives the fetch response, if it includes partitions
> in its throttled-partitions list, it increments the FollowerQuotaRate:
> var includeThrottledPartitionsInNextRequest: Boolean =
> quota.recordAndEvaluate(previousResponseThrottledBytes)
> If the quota is exceeded, no throttled partitions will be included in the
> next fetch request emitted by this replica fetcher thread.
>
> B
>
> > On 9 Aug 2016, at 23:34, Jun Rao  wrote:
> >
> > When there are several unthrottled replicas, we could also just do what's
> > suggested in KIP-74. The client is responsible for reordering the
> > partitions and the leader fills in the bytes to those partitions in
> order,
> > up to the quota limit.
> >
> > We could also do what you suggested. If quota is exceeded, include empty
> > data in the response for throttled replicas. Keep doing that until enough
> > time has passed so that the quota is no longer exceeded. This potentially
> > allows better batching per partition. Not sure if the two makes a big
> > difference in practice though.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Aug 9, 2016 at 2:31 PM, Joel Koshy  wrote:
> >
> >>>
> >>>
> >>>
> >>> On the leader side, one challenge is related to the fairness issue that
> >> Ben
> >>> brought up. The question is what if the fetch response limit is filled
> up
> >>> by the throttled replicas? If this happens constantly, we will delay
> the
> >>> progress of those un-throttled replicas. However, I think we can
> address
> >>> this issue by trying to fill up the unthrottled replicas in the
> response
> >>> first. So, the algorithm would be. Fill up unthrottled replicas up to
> the
> >>> fetch response limit. If there is space left, fill up throttled
> replicas.
> >>> If quota is exceeded for the throttled replicas, reduce the bytes in
> the
> >>> throttled replicas in the response accordingly.
> >>>
> >>
> >> Right - that's what I was trying to convey by truncation (vs empty). So
> we
> >> would attempt to fill the response for throttled partitions as much as
> we
> >> can before hitting the quota limit. There is one more detail to handle
> in
> >> this: if there are several throttled partitions and not enough remaining
> >> allowance in the fetch response to include all the throttled replicas
> then
> >> we would need to decide which of those partitions get a share; which is
> why
> >> I'm wondering if it is easier to return empty for those partitions
> entirely
> >> in the fetch response - they will make progress in the subsequent
> fetch. If
> >> they don't make fast enough progress then that would be a case for
> raising
> >> the threshold or letting it complete at an off-peak time.
> >>
> >>
> >>>
> >>> With this approach, we need some new logic to handle throttling on the
> >>> leader, but we can leave the replica threading model unchanged. So,
> >>> overall, this still seems to be a simpler approach.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Tue, Aug 9, 2016 at 11:57 AM, Mayuresh Gharat <
> >>> gharatmayures...@gmail.com
>  wrote:
> >>>
>  Nice write up Ben.
> 
>  I agree with Joel for keeping this simple by excluding the partitions
> >>> from
>  the fetch request/response when the quota is violated at the follower
> >> or
>  leader instead of having a separate 

[jira] [Created] (KAFKA-4032) Uncaught exceptions when autocreating topics

2016-08-10 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-4032:
--

 Summary: Uncaught exceptions when autocreating topics
 Key: KAFKA-4032
 URL: https://issues.apache.org/jira/browse/KAFKA-4032
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jason Gustafson


With the addition of the CreateTopics API in KIP-4, we have some new exceptions 
which can be raised from {{AdminUtils.createTopic}}. For example, it is 
possible to raise InvalidReplicationFactorException. Since we have not yet 
removed the ability to create topics automatically, we need to make sure these 
exceptions are caught and handled in both the TopicMetadata and 
GroupCoordinator request handlers. Currently these exceptions are propagated 
all the way to the processor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [ANNOUCE] Apache Kafka 0.10.0.1 Released

2016-08-10 Thread Gwen Shapira
Woohoo!

Thank you, Ismael! You make a great release manager :)

On Wed, Aug 10, 2016 at 5:01 PM, Ismael Juma  wrote:
> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 0.10.0.1.
> This is a bug fix release that fixes 53 issues in 0.10.0.0.
>
> All of the changes in this release can be found in the release notes:
> *https://archive.apache.org/dist/kafka/0.10.0.1/RELEASE_NOTES.html
> *
>
> Apache Kafka is high-throughput, publish-subscribe messaging system
> rethought of as a distributed commit log.
>
> ** Fast => A single Kafka broker can handle hundreds of megabytes of reads
> and writes per second from thousands of clients.
>
> ** Scalable => Kafka is designed to allow a single cluster to serve as the
> central data backbone for a large organization. It can be elastically and
> transparently expanded without downtime. Data streams are partitioned
> and spread over a cluster of machines to allow data streams larger than
> the capability of any single machine and to allow clusters of co-ordinated
> consumers.
>
> ** Durable => Messages are persisted on disk and replicated within the
> cluster to prevent data loss. Each broker can handle terabytes of messages
> without performance impact.
>
> ** Distributed by Design => Kafka has a modern cluster-centric design that
> offers strong durability and fault-tolerance guarantees.
>
> You can download the source release from
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/kafka-0.10.0.1
> -src.tgz
>
> and binary releases from
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/kafka_2.10-0.10.
> 0.1.tgz
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/kafka_2.11-0.10.
> 0.1.tgz
>
> A big thank you for the following people who have contributed to the
> 0.10.0.1 release.
>
> Alex Glikson, Alex Loddengaard, Alexey Romanchuk, Ashish Singh, Avi Flax,
> Damian Guy, Dustin Cote, Edoardo Comar, Eno Thereska, Ewen
> Cheslack-Postava, Flavio Junqueira, Florian Hussonnois, Geoff Anderson,
> Grant Henke, Greg Fodor, Guozhang Wang, Gwen Shapira, Henry Cai, Ismael
> Juma, Jason Gustafson, Jeff Klukas, Jendrik Poloczek, Jeyhun Karimov,
> Liquan Pei, Manikumar Reddy O, Mathieu Fenniak, Matthias J. Sax, Maysam
> Yabandeh, Mayuresh Gharat, Mickael Maison, Moritz Siuts, Onur Karaman,
> Philippe Derome, Rajini Sivaram, Rollulus, Ryan Pridgeon, Samuel Taylor,
> Sebastien Launay, Sriharsha Chintalapani, Tao Xiao, Todd Palino, Tom
> Crayford, Tom Rybak, Vahid Hashemian, Wan Wenli, Yuto Kawamura.
>
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> http://kafka.apache.org/
>
> Thanks,
> Ismael



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] Time-based releases for Apache Kafka

2016-08-10 Thread Gwen Shapira
Yeah, I agree that maintaining 6 release branches is not really sustainable...

Maybe 3 (current and 2 older) makes sense?

On Wed, Aug 10, 2016 at 7:35 PM, Joel Koshy  wrote:
> On Wed, Aug 10, 2016 at 5:44 PM, Joel Koshy  wrote:
>
>>
>>
>> On Tue, Aug 9, 2016 at 4:49 PM, Gwen Shapira  wrote:
>>
>>>
>>> 4. Frequent releases mean we need to do bugfix releases for older
>>> branches. Right now we only do bugfix releases to latest release.
>>>
>>
>> I'm a bit unclear on how the above is a side-effect of time-based
>> releases. IIUC this just changes how frequently we release off the current
>> release branch right? Or put another way, are you also proposing any
>> fundamental change to our versioning/branching scheme?
>>
>
> Actually nm - so what you're saying is we cut more frequently off trunk
> which means having to maintaining multiple release branches. However, the
> more frequently we release then it should be less difficult to upgrade from
> one release to another which means it should be reasonable to expect that
> we EOL release branches sooner than later.
>
> However, if we are expected to maintain release branches for up to two
> years then that means potential bugfix releases for up to eight release
> branches at any given time? i.e., it seems that a short inter-release
> interval would require us to EOL release branches sooner than that to make
> things manageable.



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] Time-based releases for Apache Kafka

2016-08-10 Thread Gwen Shapira
Good question, let me clarify my thinking:

We were used to doing every year (or even at lower frequency). So the
expectation was that users will just upgrade once a year and we
wouldn't worry about backporting bugfixes to bugs that were over a
year old. It seemed pretty reasonable.

But if we are trying for 3 releases a year... well, almost no one
(except Todd Palino) upgrades 3 times a year. Its like running 50
miles: Doable for some, but definitely not for everyone :)

Basically, the same reasoning behind the desire to support upgrades
for two years: We think it isn't reasonable to ask people to upgrade
every 4 month, so we need to make sure that staying on a year-old
version is still feasible and this includes fixing critical bugs that
are found in the older release (that really isn't all that old) and
publishing bugfix releases every once in a while.

This doesn't really change the way we do versioning or branching
(although our versioning could be slightly broken too, its a different
story). It just means that I propose backporting more patches for
critical bugs and doing some releases off older branches (which we
didn't in the past).

Does that make sense?

Gwen

On Wed, Aug 10, 2016 at 5:44 PM, Joel Koshy  wrote:
> On Tue, Aug 9, 2016 at 4:49 PM, Gwen Shapira  wrote:
>
>>
>> 4. Frequent releases mean we need to do bugfix releases for older
>> branches. Right now we only do bugfix releases to latest release.
>>
>
> I'm a bit unclear on how the above is a side-effect of time-based releases.
> IIUC this just changes how frequently we release off the current release
> branch right? Or put another way, are you also proposing any fundamental
> change to our versioning/branching scheme?



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] Time-based releases for Apache Kafka

2016-08-10 Thread Joel Koshy
On Wed, Aug 10, 2016 at 5:44 PM, Joel Koshy  wrote:

>
>
> On Tue, Aug 9, 2016 at 4:49 PM, Gwen Shapira  wrote:
>
>>
>> 4. Frequent releases mean we need to do bugfix releases for older
>> branches. Right now we only do bugfix releases to latest release.
>>
>
> I'm a bit unclear on how the above is a side-effect of time-based
> releases. IIUC this just changes how frequently we release off the current
> release branch right? Or put another way, are you also proposing any
> fundamental change to our versioning/branching scheme?
>

Actually nm - so what you're saying is we cut more frequently off trunk
which means having to maintaining multiple release branches. However, the
more frequently we release then it should be less difficult to upgrade from
one release to another which means it should be reasonable to expect that
we EOL release branches sooner than later.

However, if we are expected to maintain release branches for up to two
years then that means potential bugfix releases for up to eight release
branches at any given time? i.e., it seems that a short inter-release
interval would require us to EOL release branches sooner than that to make
things manageable.


[jira] [Commented] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown

2016-08-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15416351#comment-15416351
 ] 

Greg Fodor commented on KAFKA-3752:
---

Oh apologies for mis-reading the ticket, but in our case it's a recoverable 
condition.

> Provide a way for KStreams to recover from unclean shutdown
> ---
>
> Key: KAFKA-3752
> URL: https://issues.apache.org/jira/browse/KAFKA-3752
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>  Labels: architecture
>
> If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM 
> Killer), it may leave behind lock files and fail to recover.
> It would be useful to have an options (say --force) to tell KStreams to 
> proceed even if it finds old LOCK files.
> {noformat}
> [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in 
> thread [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread:583)
> org.apache.kafka.streams.errors.ProcessorStateException: Error while creating 
> the state manager
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>   at 
> 

Re: [DISCUSS] Time-based releases for Apache Kafka

2016-08-10 Thread Joel Koshy
On Tue, Aug 9, 2016 at 4:49 PM, Gwen Shapira  wrote:

>
> 4. Frequent releases mean we need to do bugfix releases for older
> branches. Right now we only do bugfix releases to latest release.
>

I'm a bit unclear on how the above is a side-effect of time-based releases.
IIUC this just changes how frequently we release off the current release
branch right? Or put another way, are you also proposing any fundamental
change to our versioning/branching scheme?


Re: [DISCUSS] KIP-71 Enable log compaction and deletion to co-exist

2016-08-10 Thread Jason Gustafson
Hi Damian,

Thanks for the KIP. We have a number of use cases in which we maintain a
materialized cache of a compacted topic. The consumer coordinator, for
example, has a cache of consumer offsets which is populated from the
__consumer_offsets topic. Kafka Connect also uses this pattern for its own
offset and config storage. The key distinction in the latter case is that
the cache is maintained on the client. So a couple questions about the
potential impact of this KIP on these use cases:

1. Would it make sense to use this KIP in the consumer coordinator to
expire offsets based on the topic's retention time? Currently, we have a
periodic task which scans the full cache to check which offsets can be
expired, but we might be able to get rid of this if we had a callback to
update the cache when a segment was deleted. Technically offsets can be
given their own expiration time, but it seems questionable whether we need
this going forward (the new consumer doesn't even expose it at the moment).
2. This KIP could also be useful for expiration in the case of a cache
maintained on the client, but I don't see an obvious way that we'd be able
to leverage it since there's no indication to the client when a segment has
been deleted (unless they reload the cache from the beginning of the log).
One approach I can think of would be to write corresponding tombstones as
necessary when a segment is removed, but that seems pretty heavy. Have you
considered this problem?

It may not be necessary to address this problem in this KIP, but since the
need for expiration seems very common for this use case, it could save a
lot of duplicated effort if the broker provided a builtin mechanism for it.

Thanks,
Jason


On Mon, Aug 8, 2016 at 12:41 AM, Damian Guy  wrote:

> Hi,
>
> We have created KIP 71: Enable log compaction and deletion to co-exist`
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 71%3A+Enable+log+compaction+and+deletion+to+co-exist
>
> Please take a look. Feedback is appreciated.
>
> Thank you
>


[ANNOUCE] Apache Kafka 0.10.0.1 Released

2016-08-10 Thread Ismael Juma
The Apache Kafka community is pleased to announce the release for Apache
Kafka 0.10.0.1.
This is a bug fix release that fixes 53 issues in 0.10.0.0.

All of the changes in this release can be found in the release notes:
*https://archive.apache.org/dist/kafka/0.10.0.1/RELEASE_NOTES.html
*

Apache Kafka is high-throughput, publish-subscribe messaging system
rethought of as a distributed commit log.

** Fast => A single Kafka broker can handle hundreds of megabytes of reads
and writes per second from thousands of clients.

** Scalable => Kafka is designed to allow a single cluster to serve as the
central data backbone for a large organization. It can be elastically and
transparently expanded without downtime. Data streams are partitioned
and spread over a cluster of machines to allow data streams larger than
the capability of any single machine and to allow clusters of co-ordinated
consumers.

** Durable => Messages are persisted on disk and replicated within the
cluster to prevent data loss. Each broker can handle terabytes of messages
without performance impact.

** Distributed by Design => Kafka has a modern cluster-centric design that
offers strong durability and fault-tolerance guarantees.

You can download the source release from
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/k
afka-0.10.0.1-src.tgz

and binary releases from
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/k
afka_2.10-0.10.0.1.tgz
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/k
afka_2.11-0.10.0.1.tgz

A big thank you for the following people who have contributed to the
0.10.0.1 release.

Alex Glikson, Alex Loddengaard, Alexey Romanchuk, Ashish Singh, Avi Flax,
Damian Guy, Dustin Cote, Edoardo Comar, Eno Thereska, Ewen
Cheslack-Postava, Flavio Junqueira, Florian Hussonnois, Geoff Anderson,
Grant Henke, Greg Fodor, Guozhang Wang, Gwen Shapira, Henry Cai, Ismael
Juma, Jason Gustafson, Jeff Klukas, Jendrik Poloczek, Jeyhun Karimov,
Liquan Pei, Manikumar Reddy O, Mathieu Fenniak, Matthias J. Sax, Maysam
Yabandeh, Mayuresh Gharat, Mickael Maison, Moritz Siuts, Onur Karaman,
Philippe Derome, Rajini Sivaram, Rollulus, Ryan Pridgeon, Samuel Taylor,
Sebastien Launay, Sriharsha Chintalapani, Tao Xiao, Todd Palino, Tom
Crayford, Tom Rybak, Vahid Hashemian, Wan Wenli, Yuto Kawamura.

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
http://kafka.apache.org/

Thanks,
Ismael


[ANNOUCE] Apache Kafka 0.10.0.1 Released

2016-08-10 Thread Ismael Juma
The Apache Kafka community is pleased to announce the release for Apache
Kafka 0.10.0.1.
This is a bug fix release that fixes 53 issues in 0.10.0.0.

All of the changes in this release can be found in the release notes:
*https://archive.apache.org/dist/kafka/0.10.0.1/RELEASE_NOTES.html
*

Apache Kafka is high-throughput, publish-subscribe messaging system
rethought of as a distributed commit log.

** Fast => A single Kafka broker can handle hundreds of megabytes of reads
and writes per second from thousands of clients.

** Scalable => Kafka is designed to allow a single cluster to serve as the
central data backbone for a large organization. It can be elastically and
transparently expanded without downtime. Data streams are partitioned
and spread over a cluster of machines to allow data streams larger than
the capability of any single machine and to allow clusters of co-ordinated
consumers.

** Durable => Messages are persisted on disk and replicated within the
cluster to prevent data loss. Each broker can handle terabytes of messages
without performance impact.

** Distributed by Design => Kafka has a modern cluster-centric design that
offers strong durability and fault-tolerance guarantees.

You can download the source release from
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/kafka-0.10.0.1
-src.tgz

and binary releases from
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/kafka_2.10-0.10.
0.1.tgz
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/kafka_2.11-0.10.
0.1.tgz

A big thank you for the following people who have contributed to the
0.10.0.1 release.

Alex Glikson, Alex Loddengaard, Alexey Romanchuk, Ashish Singh, Avi Flax,
Damian Guy, Dustin Cote, Edoardo Comar, Eno Thereska, Ewen
Cheslack-Postava, Flavio Junqueira, Florian Hussonnois, Geoff Anderson,
Grant Henke, Greg Fodor, Guozhang Wang, Gwen Shapira, Henry Cai, Ismael
Juma, Jason Gustafson, Jeff Klukas, Jendrik Poloczek, Jeyhun Karimov,
Liquan Pei, Manikumar Reddy O, Mathieu Fenniak, Matthias J. Sax, Maysam
Yabandeh, Mayuresh Gharat, Mickael Maison, Moritz Siuts, Onur Karaman,
Philippe Derome, Rajini Sivaram, Rollulus, Ryan Pridgeon, Samuel Taylor,
Sebastien Launay, Sriharsha Chintalapani, Tao Xiao, Todd Palino, Tom
Crayford, Tom Rybak, Vahid Hashemian, Wan Wenli, Yuto Kawamura.

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
http://kafka.apache.org/

Thanks,
Ismael


[GitHub] kafka pull request #1702: Kafka 3940: Log should check the return value of d...

2016-08-10 Thread imandhan
Github user imandhan closed the pull request at:

https://github.com/apache/kafka/pull/1702


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3408) consumer rebalance fail

2016-08-10 Thread Dru P (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15416065#comment-15416065
 ] 

Dru P commented on KAFKA-3408:
--

[~ewencp] I'd like to be a contributor for this.

[~ajak6] and [~beez] would you be interested in collaborating on the solution 
for this issue?

> consumer rebalance fail
> ---
>
> Key: KAFKA-3408
> URL: https://issues.apache.org/jira/browse/KAFKA-3408
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
> Environment: centos linux
>Reporter: zhongkai liu
>  Labels: newbie
>
> I use "/bin/kafka-console-consumer" command to start two consumers of group 
> "page_group",then the first conumer console report rebalance failure like 
> this:
> ERROR [page_view_group1_slave2-1458095694092-80c33086], error during 
> syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
> kafka.common.ConsumerRebalanceFailedException: 
> page_view_group1_slave2-1458095694092-80c33086 can't rebalance after 10 
> retries
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:660)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:579)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3914) Global discovery of state stores

2016-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15416050#comment-15416050
 ] 

ASF GitHub Bot commented on KAFKA-3914:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1576


> Global discovery of state stores
> 
>
> Key: KAFKA-3914
> URL: https://issues.apache.org/jira/browse/KAFKA-3914
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> - Update group membership data to include discovery endpoints
> - Enable discovery
> We need to attach some host and port information to 
> {{org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo}}
> This can then be used in 
> {{org.apache.kafka.streams.processor.internals.StreamPartitionAssignor}} to 
> build a {{Map}} that should be added to
> {{org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo}} 
> during the {{assign}} call.
> When {{StreamPartitionAssignor.onAssignment}} is called we should hold on to 
> a copy of the {{Map}} for use by the 
> discovery methods
> To enable discovery, implement the below methods on {{KafkaStreams}}
> {code}
> /**
>  * @return metadata about all tasks
>  */
> Map getAllTasks();
> /**
>  * @param storeName requested store name
>  * @return metadata about all tasks that include
>  * storeName in this KStreams instance
>  */
> Map getAllTasksWithStore(String storeName);
> /**
>  * @param key requested key
>  * @param storeName requested store name
>  * @return metadata about all tasks that include
>  * storeName and key in this KStreams instance
>  */
>  Map getAllTasksWithKey(String storeName, K 
> key);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1576: KAFKA-3914: Global discovery of state stores

2016-08-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1576


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Time-based releases for Apache Kafka

2016-08-10 Thread Ismael Juma
Hi Gwen,

Comments inline.

On Wed, Aug 10, 2016 at 6:21 PM, Gwen Shapira  wrote:

> I hear what you are saying (enterprises upgrade every 2-years
> more-or-less). It seems reasonable - this basically means maintaining
> 10 compatibility tests at any point in time.


Indeed. Although it's up to 6 tests in each branch because we only have
upgrade tests to the latest release in the given branch (i.e. if the
release branch is 0.10.0, we only have upgrade tests with 0.10.0.x as the
target in that branch). This provides full coverage without having
unnecessary tests in each branch.

We will need to be
> disciplined about maintaining those tests though - or it will get
> painful.
>

Definitely.

Another thing, hurrying up with implementing full forward-and-back
> compatibility for clients (i.e. full KIP-35 support in clients) would
> go a long way toward making upgrades less painful.
>

Yes, I think this would be awesome. It would also reduce the need to
backport client fixes to older branches.

Ismael


[jira] [Resolved] (KAFKA-3914) Global discovery of state stores

2016-08-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3914.
--
Resolution: Fixed

Issue resolved by pull request 1576
[https://github.com/apache/kafka/pull/1576]

> Global discovery of state stores
> 
>
> Key: KAFKA-3914
> URL: https://issues.apache.org/jira/browse/KAFKA-3914
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> - Update group membership data to include discovery endpoints
> - Enable discovery
> We need to attach some host and port information to 
> {{org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo}}
> This can then be used in 
> {{org.apache.kafka.streams.processor.internals.StreamPartitionAssignor}} to 
> build a {{Map}} that should be added to
> {{org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo}} 
> during the {{assign}} call.
> When {{StreamPartitionAssignor.onAssignment}} is called we should hold on to 
> a copy of the {{Map}} for use by the 
> discovery methods
> To enable discovery, implement the below methods on {{KafkaStreams}}
> {code}
> /**
>  * @return metadata about all tasks
>  */
> Map getAllTasks();
> /**
>  * @param storeName requested store name
>  * @return metadata about all tasks that include
>  * storeName in this KStreams instance
>  */
> Map getAllTasksWithStore(String storeName);
> /**
>  * @param key requested key
>  * @param storeName requested store name
>  * @return metadata about all tasks that include
>  * storeName and key in this KStreams instance
>  */
>  Map getAllTasksWithKey(String storeName, K 
> key);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka-site issue #17: Update quickstart guides with a note to refer to the c...

2016-08-10 Thread guozhangwang
Github user guozhangwang commented on the issue:

https://github.com/apache/kafka-site/pull/17
  
LGTM. cc @gwenshap 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4031) Check DirectBuffer's cleaner to be not null before using

2016-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415958#comment-15415958
 ] 

ASF GitHub Bot commented on KAFKA-4031:
---

GitHub user soumyajit-sahu opened a pull request:

https://github.com/apache/kafka/pull/1718

KAFKA-4031: Check if buffer cleaner is null before using it

A small fix to check null before using the reference

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Microsoft/kafka 
nullCheckForDirectBufferCleaner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1718.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1718


commit d82627e40315c6e655811aa49a31b2e4c7ce2629
Author: Som Sahu 
Date:   2016-08-10T19:13:52Z

Check if buffer cleaner is null before using it




> Check DirectBuffer's cleaner to be not null before using
> 
>
> Key: KAFKA-4031
> URL: https://issues.apache.org/jira/browse/KAFKA-4031
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: Soumyajit Sahu
>Assignee: Soumyajit Sahu
> Fix For: 0.10.0.2
>
>
> Found the following exception stack in our broker logs.
> The fix should be straight forward with a null check.
> [2016-08-09 17:10:24,451] WARN Error when freeing index buffer 
> (kafka.log.OffsetIndex)
> java.lang.NullPointerException
>   at 
> kafka.log.OffsetIndex.kafka$log$OffsetIndex$$forceUnmap(OffsetIndex.scala:312)
>   at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:294)
>   at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:287)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>   at kafka.log.OffsetIndex.resize(OffsetIndex.scala:287)
>   at kafka.log.Log.loadSegments(Log.scala:245)
>   at kafka.log.Log.(Log.scala:101)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1718: KAFKA-4031: Check if buffer cleaner is null before...

2016-08-10 Thread soumyajit-sahu
GitHub user soumyajit-sahu opened a pull request:

https://github.com/apache/kafka/pull/1718

KAFKA-4031: Check if buffer cleaner is null before using it

A small fix to check null before using the reference

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Microsoft/kafka 
nullCheckForDirectBufferCleaner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1718.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1718


commit d82627e40315c6e655811aa49a31b2e4c7ce2629
Author: Som Sahu 
Date:   2016-08-10T19:13:52Z

Check if buffer cleaner is null before using it




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


RE: Kafka consumer getting duplicate message

2016-08-10 Thread Ghosh, Achintya (Contractor)
Can anyone please check this one?

Thanks
Achintya

-Original Message-
From: Ghosh, Achintya (Contractor) 
Sent: Monday, August 08, 2016 9:44 AM
To: us...@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: RE: Kafka consumer getting duplicate message

Thank you , Ewen for your response.
Actually we are using 1.0.0.M2 Spring Kafka release that uses Kafka 0.9 release.
Yes, we see a lot of duplicates and here is our producer and consumer settings 
in application. We don't see any duplicacy at Producer end I mean if we send 
1000 messages to a particular Topic we receive exactly (sometimes less) 1000 
messages.

But when we consume the message at Consumer level we see a lot of messages with 
same offset value and same partition , so please let us know what tweaking is 
needed to avaoid the duplicacy.

We have three types of Topics and each topic has 3 replication factors and 10 
partitions.

Producer Configuration:

bootstrap.producer.servers=provisioningservices-aq-dev.g.comcast.net:80
acks=1
retries=3
batch.size=16384
linger.ms=5
buffer.memory=33554432
request.timeout.ms=6
timeout.ms=6
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.comcast.provisioning.provisioning_.kafka.CustomMessageSer

Consumer Configuration:

bootstrap.consumer.servers=provisioningservices-aqr-dev.g.comcast.net:80
group.id=ps-consumer-group
enable.auto.commit=false
auto.commit.interval.ms=100
session.timeout.ms=15000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.comcast.provisioning.provisioning_.kafka.CustomMessageDeSer

factory.getContainerProperties().setSyncCommits(true);
factory.setConcurrency(5);

Thanks
Achintya


-Original Message-
From: Ewen Cheslack-Postava [mailto:e...@confluent.io]
Sent: Saturday, August 06, 2016 1:45 AM
To: us...@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: Re: Kafka consumer getting duplicate message

Achintya,

1.0.0.M2 is not an official release, so this version number is not particularly 
meaningful to people on this list. What platform/distribution are you using and 
how does this map to actual Apache Kafka releases?

In general, it is not possible for any system to guarantee exactly once 
semantics because those semantics rely on the source and destination systems 
coordinating -- the source provides some sort of retry semantics, and the 
destination system needs to do some sort of deduplication or similar to only 
"deliver" the data one time.

That said, duplicates should usually only be generated in the face of failures. 
If you're seeing a lot of duplicates, that probably means shutdown/failover is 
not being handled correctly. If you can provide more info about your setup, we 
might be able to suggest tweaks that will avoid these situations.

-Ewen

On Fri, Aug 5, 2016 at 8:15 AM, Ghosh, Achintya (Contractor) < 
achintya_gh...@comcast.com> wrote:

> Hi there,
>
> We are using Kafka 1.0.0.M2 with Spring and we see a lot of duplicate 
> message is getting received by the Listener onMessage() method .
> We configured :
>
> enable.auto.commit=false
> session.timeout.ms=15000
> factory.getContainerProperties().setSyncCommits(true);
> factory.setConcurrency(5);
>
> So what could be the reason to get the duplicate messages?
>
> Thanks
> Achintya
>



--
Thanks,
Ewen


[jira] [Assigned] (KAFKA-4031) Check DirectBuffer's cleaner to be not null before using

2016-08-10 Thread Soumyajit Sahu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Soumyajit Sahu reassigned KAFKA-4031:
-

Assignee: Soumyajit Sahu

> Check DirectBuffer's cleaner to be not null before using
> 
>
> Key: KAFKA-4031
> URL: https://issues.apache.org/jira/browse/KAFKA-4031
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: Soumyajit Sahu
>Assignee: Soumyajit Sahu
> Fix For: 0.10.0.2
>
>
> Found the following exception stack in our broker logs.
> The fix should be straight forward with a null check.
> [2016-08-09 17:10:24,451] WARN Error when freeing index buffer 
> (kafka.log.OffsetIndex)
> java.lang.NullPointerException
>   at 
> kafka.log.OffsetIndex.kafka$log$OffsetIndex$$forceUnmap(OffsetIndex.scala:312)
>   at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:294)
>   at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:287)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>   at kafka.log.OffsetIndex.resize(OffsetIndex.scala:287)
>   at kafka.log.Log.loadSegments(Log.scala:245)
>   at kafka.log.Log.(Log.scala:101)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4031) Check DirectBuffer's cleaner to be not null before using

2016-08-10 Thread Soumyajit Sahu (JIRA)
Soumyajit Sahu created KAFKA-4031:
-

 Summary: Check DirectBuffer's cleaner to be not null before using
 Key: KAFKA-4031
 URL: https://issues.apache.org/jira/browse/KAFKA-4031
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.0.1
Reporter: Soumyajit Sahu
 Fix For: 0.10.0.2


Found the following exception stack in our broker logs.
The fix should be straight forward with a null check.

[2016-08-09 17:10:24,451] WARN Error when freeing index buffer 
(kafka.log.OffsetIndex)
java.lang.NullPointerException
at 
kafka.log.OffsetIndex.kafka$log$OffsetIndex$$forceUnmap(OffsetIndex.scala:312)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:294)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:287)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:287)
at kafka.log.Log.loadSegments(Log.scala:245)
at kafka.log.Log.(Log.scala:101)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3971) Consumers drop from coordinator and cannot reconnect

2016-08-10 Thread Lei Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415865#comment-15415865
 ] 

Lei Wang commented on KAFKA-3971:
-

we don't see too many threads, less than 1k threads at 800 consumers for 
example. also, we don't restrict on the resources a process can use, ulimit is 
set to unlimited for example. it has also a lot free heap spaces, and cpu, 
other processes run as the same user run fine. I'm not sure it's a resource 
issue.

> Consumers drop from coordinator and cannot reconnect
> 
>
> Key: KAFKA-3971
> URL: https://issues.apache.org/jira/browse/KAFKA-3971
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
> Environment: version 0.9.0.1
>Reporter: Lei Wang
> Attachments: KAFKA-3971.txt
>
>
> From time to time, we're creating new topics, and all consumers will pickup 
> those new topics. When starting to consume from these new topics, we often 
> see some of random consumers cannot connect to the coordinator. The log will 
> be flushed with the following log message tens of thousands every second:
> {noformat}
> 16/07/18 18:18:36.003 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.005 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> {noformat}
> the servers seem working fine, and other consumers are also happy.
> from the log, looks like it's retrying multiple times every millisecond but 
> all failing.
> the same process are consuming from many topics, some of them are still 
> working well, but those random topics will fail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3971) Consumers drop from coordinator and cannot reconnect

2016-08-10 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415838#comment-15415838
 ] 

Andrew Olson commented on KAFKA-3971:
-

Even at 1 partition, if a single process is consuming from that many topics its 
thread count would be very high - a separate fetcher thread started for each 
partition being consumed. So you may be at risk of running into a "max user 
processes" sort of ulimit or out of memory, especially if there is a 
substantial amount of data in each topic when the consumer is started, as each 
thread can read up to fetch.message.max.bytes worth of data.

> Consumers drop from coordinator and cannot reconnect
> 
>
> Key: KAFKA-3971
> URL: https://issues.apache.org/jira/browse/KAFKA-3971
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
> Environment: version 0.9.0.1
>Reporter: Lei Wang
> Attachments: KAFKA-3971.txt
>
>
> From time to time, we're creating new topics, and all consumers will pickup 
> those new topics. When starting to consume from these new topics, we often 
> see some of random consumers cannot connect to the coordinator. The log will 
> be flushed with the following log message tens of thousands every second:
> {noformat}
> 16/07/18 18:18:36.003 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.005 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> {noformat}
> the servers seem working fine, and other consumers are also happy.
> from the log, looks like it's retrying multiple times every millisecond but 
> all failing.
> the same process are consuming from many topics, some of them are still 
> working well, but those random topics will fail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3993) Console producer drops data

2016-08-10 Thread Roger Hoover (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415811#comment-15415811
 ] 

Roger Hoover commented on KAFKA-3993:
-

Thanks, [~vahid].  Yeah, it looks the same.

> Console producer drops data
> ---
>
> Key: KAFKA-3993
> URL: https://issues.apache.org/jira/browse/KAFKA-3993
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>
> The console producer drops data when if the process exits too quickly.  I 
> suspect that the shutdown hook does not call close() or something goes wrong 
> during that close().
> Here's a simple to illustrate the issue:
> {noformat}
> export BOOTSTRAP_SERVERS=localhost:9092
> export TOPIC=bar
> export MESSAGES=1
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 
> --replication-factor 1 --topic "$TOPIC" \
> && echo "acks=all" > /tmp/producer.config \
> && echo "linger.ms=0" >> /tmp/producer.config \
> && seq "$MESSAGES" | ./bin/kafka-console-producer.sh --broker-list 
> "$BOOTSTRAP_SERVERS" --topic "$TOPIC" --producer-config /tmp/producer.config \
> && ./bin/kafka-console-consumer.sh --bootstrap-server "$BOOTSTRAP_SERVERS" 
> --new-consumer --from-beginning --max-messages "${MESSAGES}" --topic "$TOPIC"
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3971) Consumers drop from coordinator and cannot reconnect

2016-08-10 Thread Lei Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415810#comment-15415810
 ] 

Lei Wang commented on KAFKA-3971:
-

one partition each

> Consumers drop from coordinator and cannot reconnect
> 
>
> Key: KAFKA-3971
> URL: https://issues.apache.org/jira/browse/KAFKA-3971
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
> Environment: version 0.9.0.1
>Reporter: Lei Wang
> Attachments: KAFKA-3971.txt
>
>
> From time to time, we're creating new topics, and all consumers will pickup 
> those new topics. When starting to consume from these new topics, we often 
> see some of random consumers cannot connect to the coordinator. The log will 
> be flushed with the following log message tens of thousands every second:
> {noformat}
> 16/07/18 18:18:36.003 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.005 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> {noformat}
> the servers seem working fine, and other consumers are also happy.
> from the log, looks like it's retrying multiple times every millisecond but 
> all failing.
> the same process are consuming from many topics, some of them are still 
> working well, but those random topics will fail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3971) Consumers drop from coordinator and cannot reconnect

2016-08-10 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415806#comment-15415806
 ] 

Andrew Olson commented on KAFKA-3971:
-

How many partitions per topic do you have?

> Consumers drop from coordinator and cannot reconnect
> 
>
> Key: KAFKA-3971
> URL: https://issues.apache.org/jira/browse/KAFKA-3971
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
> Environment: version 0.9.0.1
>Reporter: Lei Wang
> Attachments: KAFKA-3971.txt
>
>
> From time to time, we're creating new topics, and all consumers will pickup 
> those new topics. When starting to consume from these new topics, we often 
> see some of random consumers cannot connect to the coordinator. The log will 
> be flushed with the following log message tens of thousands every second:
> {noformat}
> 16/07/18 18:18:36.003 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.005 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> {noformat}
> the servers seem working fine, and other consumers are also happy.
> from the log, looks like it's retrying multiple times every millisecond but 
> all failing.
> the same process are consuming from many topics, some of them are still 
> working well, but those random topics will fail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3993) Console producer drops data

2016-08-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415793#comment-15415793
 ] 

Vahid Hashemian commented on KAFKA-3993:


[~theduderog] I wonder if this is the same issue as the one reported in [this 
JIRA|https://issues.apache.org/jira/browse/KAFKA-3129].

> Console producer drops data
> ---
>
> Key: KAFKA-3993
> URL: https://issues.apache.org/jira/browse/KAFKA-3993
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>
> The console producer drops data when if the process exits too quickly.  I 
> suspect that the shutdown hook does not call close() or something goes wrong 
> during that close().
> Here's a simple to illustrate the issue:
> {noformat}
> export BOOTSTRAP_SERVERS=localhost:9092
> export TOPIC=bar
> export MESSAGES=1
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 
> --replication-factor 1 --topic "$TOPIC" \
> && echo "acks=all" > /tmp/producer.config \
> && echo "linger.ms=0" >> /tmp/producer.config \
> && seq "$MESSAGES" | ./bin/kafka-console-producer.sh --broker-list 
> "$BOOTSTRAP_SERVERS" --topic "$TOPIC" --producer-config /tmp/producer.config \
> && ./bin/kafka-console-consumer.sh --bootstrap-server "$BOOTSTRAP_SERVERS" 
> --new-consumer --from-beginning --max-messages "${MESSAGES}" --topic "$TOPIC"
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3971) Consumers drop from coordinator and cannot reconnect

2016-08-10 Thread Lei Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415770#comment-15415770
 ] 

Lei Wang commented on KAFKA-3971:
-

tried the assigning manually, it didn't help much, still seeing tuns of 
"Marking the coordinator  dead" coming when consuming 800+ topics.

> Consumers drop from coordinator and cannot reconnect
> 
>
> Key: KAFKA-3971
> URL: https://issues.apache.org/jira/browse/KAFKA-3971
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
> Environment: version 0.9.0.1
>Reporter: Lei Wang
> Attachments: KAFKA-3971.txt
>
>
> From time to time, we're creating new topics, and all consumers will pickup 
> those new topics. When starting to consume from these new topics, we often 
> see some of random consumers cannot connect to the coordinator. The log will 
> be flushed with the following log message tens of thousands every second:
> {noformat}
> 16/07/18 18:18:36.003 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.005 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> {noformat}
> the servers seem working fine, and other consumers are also happy.
> from the log, looks like it's retrying multiple times every millisecond but 
> all failing.
> the same process are consuming from many topics, some of them are still 
> working well, but those random topics will fail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3993) Console producer drops data

2016-08-10 Thread Roger Hoover (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roger Hoover updated KAFKA-3993:

Description: 
The console producer drops data when if the process exits too quickly.  I 
suspect that the shutdown hook does not call close() or something goes wrong 
during that close().

Here's a simple to illustrate the issue:

{noformat}
export BOOTSTRAP_SERVERS=localhost:9092
export TOPIC=bar
export MESSAGES=1
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 
--replication-factor 1 --topic "$TOPIC" \
&& echo "acks=all" > /tmp/producer.config \
&& echo "linger.ms=0" >> /tmp/producer.config \
&& seq "$MESSAGES" | ./bin/kafka-console-producer.sh --broker-list 
"$BOOTSTRAP_SERVERS" --topic "$TOPIC" --producer-config /tmp/producer.config \
&& ./bin/kafka-console-consumer.sh --bootstrap-server "$BOOTSTRAP_SERVERS" 
--new-consumer --from-beginning --max-messages "${MESSAGES}" --topic "$TOPIC"
{noformat}

  was:
The console producer drops data when if the process exits too quickly.  I 
suspect that the shutdown hook does not call close() or something goes wrong 
during that close().

Here's a simple to illustrate the issue:

{noformat}
export BOOTSTRAP_SERVERS=localhost:9092
export TOPIC=bar
export MESSAGES=1
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 
--replication-factor 1 --topic "$TOPIC" \
&& echo "acks=all" > /tmp/producer.config \
&& echo "linger.ms=0" >> /tmp/producer.config \
&& seq "$MESSAGES" | ./bin/kafka-console-producer.sh --broker-list 
"$BOOTSTRAP_SERVERS" --topic "$TOPIC" \
&& ./bin/kafka-console-consumer.sh --bootstrap-server "$BOOTSTRAP_SERVERS" 
--new-consumer --from-beginning --max-messages "${MESSAGES}" --topic "$TOPIC"
{noformat}


> Console producer drops data
> ---
>
> Key: KAFKA-3993
> URL: https://issues.apache.org/jira/browse/KAFKA-3993
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>
> The console producer drops data when if the process exits too quickly.  I 
> suspect that the shutdown hook does not call close() or something goes wrong 
> during that close().
> Here's a simple to illustrate the issue:
> {noformat}
> export BOOTSTRAP_SERVERS=localhost:9092
> export TOPIC=bar
> export MESSAGES=1
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 
> --replication-factor 1 --topic "$TOPIC" \
> && echo "acks=all" > /tmp/producer.config \
> && echo "linger.ms=0" >> /tmp/producer.config \
> && seq "$MESSAGES" | ./bin/kafka-console-producer.sh --broker-list 
> "$BOOTSTRAP_SERVERS" --topic "$TOPIC" --producer-config /tmp/producer.config \
> && ./bin/kafka-console-consumer.sh --bootstrap-server "$BOOTSTRAP_SERVERS" 
> --new-consumer --from-beginning --max-messages "${MESSAGES}" --topic "$TOPIC"
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4030) Update older quickstart documents to clarify which version they relate to

2016-08-10 Thread Todd Snyder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415730#comment-15415730
 ] 

Todd Snyder commented on KAFKA-4030:


https://github.com/apache/kafka-site/pull/17

> Update older quickstart documents to clarify which version they relate to
> -
>
> Key: KAFKA-4030
> URL: https://issues.apache.org/jira/browse/KAFKA-4030
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Todd Snyder
>  Labels: documentation, website
>
> If you search for 'kafka quickstart' it takes you to 
> kafka.apache.org/07/quickstart.html which is, unclearly, for release 0.7 and 
> not the current release.
> [~gwenshap] suggested a ticket and a note added to the 0.7 (and likely 0.8 
> and 0.9) quickstart guides directing people to use ~current for the latest 
> release documentation.
> I'll submit a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4030) Update older quickstart documents to clarify which version they relate to

2016-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415728#comment-15415728
 ] 

ASF GitHub Bot commented on KAFKA-4030:
---

GitHub user cptcanuck opened a pull request:

https://github.com/apache/kafka-site/pull/17

Update quickstart guides with a note to refer to the current version …

https://issues.apache.org/jira/browse/KAFKA-4030

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cptcanuck/kafka-site asf-site

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/17.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17


commit cad6139953a2a0a67638d01fcdb38c501ea30608
Author: todd 
Date:   2016-08-10T18:22:24Z

Update quickstart guides with a note to refer to the current version of the 
documentation, per KAFKA-4030




> Update older quickstart documents to clarify which version they relate to
> -
>
> Key: KAFKA-4030
> URL: https://issues.apache.org/jira/browse/KAFKA-4030
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Todd Snyder
>  Labels: documentation, website
>
> If you search for 'kafka quickstart' it takes you to 
> kafka.apache.org/07/quickstart.html which is, unclearly, for release 0.7 and 
> not the current release.
> [~gwenshap] suggested a ticket and a note added to the 0.7 (and likely 0.8 
> and 0.9) quickstart guides directing people to use ~current for the latest 
> release documentation.
> I'll submit a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka-site pull request #17: Update quickstart guides with a note to refer t...

2016-08-10 Thread cptcanuck
GitHub user cptcanuck opened a pull request:

https://github.com/apache/kafka-site/pull/17

Update quickstart guides with a note to refer to the current version …

https://issues.apache.org/jira/browse/KAFKA-4030

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cptcanuck/kafka-site asf-site

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/17.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17


commit cad6139953a2a0a67638d01fcdb38c501ea30608
Author: todd 
Date:   2016-08-10T18:22:24Z

Update quickstart guides with a note to refer to the current version of the 
documentation, per KAFKA-4030




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4030) Update older quickstart documents to clarify which version they relate to

2016-08-10 Thread Todd Snyder (JIRA)
Todd Snyder created KAFKA-4030:
--

 Summary: Update older quickstart documents to clarify which 
version they relate to
 Key: KAFKA-4030
 URL: https://issues.apache.org/jira/browse/KAFKA-4030
 Project: Kafka
  Issue Type: Improvement
  Components: website
Reporter: Todd Snyder


If you search for 'kafka quickstart' it takes you to 
kafka.apache.org/07/quickstart.html which is, unclearly, for release 0.7 and 
not the current release.

[~gwenshap] suggested a ticket and a note added to the 0.7 (and likely 0.8 and 
0.9) quickstart guides directing people to use ~current for the latest release 
documentation.

I'll submit a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-10 Thread Andrey L. Neporada
Hi, Jun!

Thanks for feedback!

> On 10 Aug 2016, at 17:42, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> Thanks for the reply. A couple of more comments inline below.
> 
> On Wed, Aug 10, 2016 at 3:56 AM, Andrey L. Neporada <
> anepor...@yandex-team.ru > wrote:
> 
>> 
>> Yes, such cooperative configuration for fetch request may look a bit weird.
>> But I don’t see other options if we want to remove partition limits from
>> fetch request.
>> In this case we need some server-side configuration for partition limits.
>> 
>> 
> What if we keep the current partition level limit in the fetch request and
> just add an additional response level limit? The default partition limit
> can be much smaller than the max message size and will only be used for
> fairness across partitions.
> 

Yes, we can just add global response limit and leave partition limits as is.
In fact, my initial implementation (https://github.com/apache/kafka/pull/1683) 
of this KIP preserves per-partition limits.
However, as it seems from KAFKA-2063 discussion, some people prefer to 
deprecate partition level limit.
I have no real opinion on this topic - hope we can choose best option here.

...
>> 
>> No, I mean that actual response side can be bigger than limit_bytes, but
>> less than limit_bytes + message.max.bytes.
>> This behaviour is a result of algorithm proposed in KIP (and in PR).
>> 
>> 
> Got it. An alternative is to only add a partition's data to the response up
> to the remaining response limit. The only exception is that this is the
> first partition and the first message in that partition is larger than the
> response limit. Then the bound will be max(limit_bytes, message.max.bytes),
> which is tighter.
> 

Yes, this one looks better.


Thanks,
Andrey.

[jira] [Commented] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown

2016-08-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415664#comment-15415664
 ] 

Greg Fodor commented on KAFKA-3752:
---

Hey [~guozhang], we're hitting this issue as well during a rebalance and I 
think also during failures during startup due to KAFKA-3559. The job eventually 
recovers. We are set to running 32 threads per instance and have 2 instances 
(though the issue happens more often when we run on a single instance.) Log 
here: https://gist.github.com/gfodor/bac65bff38233193b70836b78c701e7b 

> Provide a way for KStreams to recover from unclean shutdown
> ---
>
> Key: KAFKA-3752
> URL: https://issues.apache.org/jira/browse/KAFKA-3752
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>  Labels: architecture
>
> If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM 
> Killer), it may leave behind lock files and fail to recover.
> It would be useful to have an options (say --force) to tell KStreams to 
> proceed even if it finds old LOCK files.
> {noformat}
> [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in 
> thread [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread:583)
> org.apache.kafka.streams.errors.ProcessorStateException: Error while creating 
> the state manager
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>   at 
> 

[jira] [Updated] (KAFKA-3971) Consumers drop from coordinator and cannot reconnect

2016-08-10 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-3971:

Summary: Consumers drop from coordinator and cannot reconnect  (was: 
Consumers drop from coordinator and cannot reconnet)

> Consumers drop from coordinator and cannot reconnect
> 
>
> Key: KAFKA-3971
> URL: https://issues.apache.org/jira/browse/KAFKA-3971
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
> Environment: version 0.9.0.1
>Reporter: Lei Wang
> Attachments: KAFKA-3971.txt
>
>
> From time to time, we're creating new topics, and all consumers will pickup 
> those new topics. When starting to consume from these new topics, we often 
> see some of random consumers cannot connect to the coordinator. The log will 
> be flushed with the following log message tens of thousands every second:
> {noformat}
> 16/07/18 18:18:36.003 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.005 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> {noformat}
> the servers seem working fine, and other consumers are also happy.
> from the log, looks like it's retrying multiple times every millisecond but 
> all failing.
> the same process are consuming from many topics, some of them are still 
> working well, but those random topics will fail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3847) Connect tasks should not share a producer

2016-08-10 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava reassigned KAFKA-3847:


Assignee: Ewen Cheslack-Postava  (was: Liquan Pei)

> Connect tasks should not share a producer
> -
>
> Key: KAFKA-3847
> URL: https://issues.apache.org/jira/browse/KAFKA-3847
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> Currently the tasks share a producer. This is nice in terms of potentially 
> coalescing requests to the same broker, keeping port usage reasonable, 
> minimizing the # of connections to brokers (which is nice for brokers, not so 
> important for connect itself). But it also means we unnecessarily tie tasks 
> to each other in other ways -- e.g. when one needs to flush, it we 
> effectively block it on other connector's data being produced and acked.
> Given that we allocate a consumer per sink, a lot of the arguments for 
> sharing a producer effectively go away. We should decouple the tasks by using 
> a separate producer for each task (or, at a minimum, for each connector's 
> tasks).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Strange behavior when turn the system clock back

2016-08-10 Thread Gabriel Ibarra
Hello guys, I am dealing with an issue when turn the system clock back
(either due to NTP or administrator action). I'm using kafka_2.11-0.10.0.0

I follow the next steps.
- Start a consumer for TOPIC_NAME with group id GROUP_NAME. It will be
owner of all the partitions.
- Turn the system clock back. For instance 1 hour.
- Start a new consumer for TOPIC_NAME  using the same group id, it will
force a rebalance.

After these actions the kafka server logs constantly the below
messages, and after
a while both consumers do not receive more packages. I saw that this
condition lasts at least the time that the clock went back, for this
example 1 hour, and finally after this time kafka come back to work.

[2016-08-08 11:30:23,023] INFO [GroupCoordinator 0]: Preparing to
restabilize group GROUP_NAME with old generation 2 (kafka.coordinator.
GroupCoordinator)
[2016-08-08 11:30:23,025] INFO [GroupCoordinator 0]: Stabilized group
GROUP_NAME generation 3 (kafka.coordinator.GroupCoordinator)
[2016-08-08 11:30:23,027] INFO [GroupCoordinator 0]: Preparing to
restabilize group GROUP_NAME with old generation 3 (kafka.coordinator.
GroupCoordinator)
[2016-08-08 11:30:23,029] INFO [GroupCoordinator 0]: Group GROUP_NAME
generation 3 is dead and removed (kafka.coordinator.GroupCoordinator)
[2016-08-08 11:30:23,032] INFO [GroupCoordinator 0]: Preparing to
restabilize group GROUP_NAME with old generation 0 (kafka.coordinator.
GroupCoordinator)
[2016-08-08 11:30:23,032] INFO [GroupCoordinator 0]: Stabilized group
GROUP_NAME generation 1 (kafka.coordinator.GroupCoordinator)
[2016-08-08 11:30:23,033] INFO [GroupCoordinator 0]: Preparing to
restabilize group GROUP_NAME with old generation 1 (kafka.coordinator.
GroupCoordinator)
[2016-08-08 11:30:23,034] INFO [GroupCoordinator 0]: Group GROUP generation
1 is dead and removed (kafka.coordinator.GroupCoordinator)
[2016-08-08 11:30:23,043] INFO [GroupCoordinator 0]: Preparing to
restabilize group GROUP_NAME with old generation 0 (kafka.coordinator.
GroupCoordinator)
[2016-08-08 11:30:23,044] INFO [GroupCoordinator 0]: Stabilized group
GROUP_NAME generation 1 (kafka.coordinator.GroupCoordinator)
[2016-08-08 11:30:23,044] INFO [GroupCoordinator 0]: Preparing to
restabilize group GROUP_NAME with old generation 1 (kafka.coordinator.
GroupCoordinator)
[2016-08-08 11:30:23,045] INFO [GroupCoordinator 0]: Group GROUP_NAME
generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)

IMHO, I think that kafka's consumers have to work fine after any change of
system clock, but maybe this behavior has fundamentals that I don't know.

I'm sorry if it was discussed previously, I was researching but I didn't
found a similar issue.

Thanks,

-- 



Gabriel Alejandro Ibarra

Software Engineer

San Lorenzo 47, 3rd Floor, Office 5

Córdoba, Argentina

Phone: +54 351 4217888


Re: [DISCUSS] Time-based releases for Apache Kafka

2016-08-10 Thread Gwen Shapira
I hear what you are saying (enterprises upgrade every 2-years
more-or-less). It seems reasonable - this basically means maintaining
10 compatibility tests at any point in time. We will need to be
disciplined about maintaining those tests though - or it will get
painful.

Another thing, hurrying up with implementing full forward-and-back
compatibility for clients (i.e. full KIP-35 support in clients) would
go a long way toward making upgrades less painful.

Gwen

On Wed, Aug 10, 2016 at 6:34 AM, Ismael Juma  wrote:
> Hi Gwen,
>
> The proposal sounds good to me. With regards to the cadence, 3 releases a
> year (every 4 months as you said) sounds reasonable. One thing that I think
> is very important if we release more often is that users should be able to
> upgrade directly to the latest release for a reasonable period. For
> example, we could say that we support direct upgrades for 2 years (6
> releases).
>
> Ismael
>
> On Wed, Aug 10, 2016 at 12:49 AM, Gwen Shapira  wrote:
>
>> Dear Kafka Developers and Users,
>>
>> In the past, our releases have been quite unpredictable. We'll notice
>> that a large number of nice features made it in (or are close),
>> someone would suggest a release and we'd do it. This is fun, but makes
>> planning really hard - we saw it during the last release which we
>> decided to delay by a few weeks to allow more features to "land".
>>
>> Many other communities have adopted time-based releases successfully
>> (Cassandra, GCC, LLVM, Fedora, Gnome, Ubuntu, etc.). And I thought it
>> will make sense for the Apache Kafka community to try doing the same.
>>
>> The benefits of this approach are:
>>
>> 1. A quicker feedback cycle and users can benefit from features
>> quicker (assuming for reasonably short time between releases - I was
>> thinking 4 months)
>>
>> 2. Predictability for contributors and users:
>> * Developers and reviewers can decide in advance what release they are
>> aiming for with specific features.
>> * If a feature misses a release we have a good idea of when it will show
>> up.
>> * Users know when to expect their features
>>
>> 3. Transparency - There will be a published cut-off date (AKA feature
>> freeze) for the release and people will know about it in advance.
>> Hopefully this will remove the contention around which features make
>> it.
>>
>> 4. Quality - we've seen issues pop up in release candidates due to
>> last-minute features that didn't have proper time to bake in. More
>> time between feature freeze and release will let us test more,
>> document more and resolve more issues.
>>
>> Since nothing is ever perfect, there will be some downsides:
>>
>> 1. Most notably, features that miss the feature-freeze date for a
>> release will have to wait few month for the next release. Features
>> will reach users faster overall as per benefit #1, but individual
>> features that just miss the cut will lose out
>>
>> 2. More releases a year mean that being a committer is more work -
>> release management is still some headache and we'll have more of
>> those. Hopefully we'll get better at it. Also, the committer list is
>> growing and hopefully it will be less than once-a-year effort for each
>> committer.
>>
>> 3. For users, figuring out which release to use and having frequent
>> new releases to upgrade to may be a bit confusing.
>>
>> 4. Frequent releases mean we need to do bugfix releases for older
>> branches. Right now we only do bugfix releases to latest release.
>>
>> I think the benefits outweigh the drawbacks. Or at least suggest that
>> its worth trying - we can have another discussion in few releases to
>> see if we want to keep it that way or try something else.
>>
>> My suggestion for the process:
>>
>> 1. We decide on a reasonable release cadence
>> 2. We decide on release dates (even rough estimate such as "end of
>> February" or something) and work back feature freeze dates.
>> 3. Committers volunteer to be "release managers" for specific
>> releases. We can coordinate on the list or on a wiki. If no committer
>> volunteers, we assume the community doesn't need a release and skip
>> it.
>> 4. At the "feature freeze" date, the release manager announces the
>> contents of the release (which KIPs made it in on time), creates the
>> release branch and starts the release process as usual. From this
>> point onwards, only bug fixes should be double-committed to the
>> release branch while trunk can start collecting features for the
>> subsequent release.
>>
>> Comments and improvements are appreciated.
>>
>> Gwen Shapira
>> Former-release-manager
>>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


[jira] [Created] (KAFKA-4029) SSL support for Connect REST API

2016-08-10 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-4029:


 Summary: SSL support for Connect REST API
 Key: KAFKA-4029
 URL: https://issues.apache.org/jira/browse/KAFKA-4029
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava


Currently the Connect REST API only supports http. We should also add SSL 
support so access to the REST API can be secured.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4028) Add Connect cluster ID and expose it in REST API

2016-08-10 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-4028:


 Summary: Add Connect cluster ID and expose it in REST API
 Key: KAFKA-4028
 URL: https://issues.apache.org/jira/browse/KAFKA-4028
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava


We have some basic info about the server available via GET / (currently version 
information). It'd be nice to have some additional cluster metadata available 
via the REST API (perhaps under a /cluster endpoint). A cluster ID would be a 
good starting point, although we'll need to decide whether we really want this 
as a global view of the cluster or a set of APIs that give you info about the 
individual worker (where some values should simply be consistent across the 
cluster).

There are a couple of ways we could implement cluster IDs:
* An entirely new config
* If we could get some unique ID for the Kafka cluster, leverage the name of 
the config topic. This doesn't require a new worker config, but the name 
probably isn't ideal -- it might include a reasonable prefix, but will also 
often include the suffix "-config" which will look odd.
* If we could get some unique ID for the Kafka cluster and implement 
KAFKA-3254, we could automatically generate one as (Kafka cluster ID, topic 
prefix)

Note that some of these are assuming distributed mode. We'd have to figure out 
a scheme that can also be applied to standalone clusters. Backwards 
compatibility is also a concern since we'd rather not introduce any new 
required configs if possible.

As this is new public API, it'll need a KIP before implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-73: Replication Quotas

2016-08-10 Thread Ben Stopford
Thanks again for the responses everyone. I’ve removed the the extra fetcher 
threads from the proposal, switching to the inclusion-based approach. The 
relevant section is:

The follower makes a requests, using the fixed size of 
replica.fetch.response.max.bytes as per KIP-74 
.
 The order of the partitions in the fetch request are randomised to ensure 
fairness.
When the leader receives the fetch request it processes the partitions in the 
defined order, up to the response's size limit. If the inclusion of a 
partition, listed in the leader's throttled-replicas list, causes the 
LeaderQuotaRate to be exceeded, that partition is omitted from the response 
(aka returns 0 bytes). Logically, this is of the form:
var bytesAllowedForThrottledPartition = 
quota.recordAndMaybeAdjust(bytesRequestedForPartition) 
When the follower receives the fetch response, if it includes partitions in its 
throttled-partitions list, it increments the FollowerQuotaRate:
var includeThrottledPartitionsInNextRequest: Boolean = 
quota.recordAndEvaluate(previousResponseThrottledBytes) 
If the quota is exceeded, no throttled partitions will be included in the next 
fetch request emitted by this replica fetcher thread. 

B

> On 9 Aug 2016, at 23:34, Jun Rao  wrote:
> 
> When there are several unthrottled replicas, we could also just do what's
> suggested in KIP-74. The client is responsible for reordering the
> partitions and the leader fills in the bytes to those partitions in order,
> up to the quota limit.
> 
> We could also do what you suggested. If quota is exceeded, include empty
> data in the response for throttled replicas. Keep doing that until enough
> time has passed so that the quota is no longer exceeded. This potentially
> allows better batching per partition. Not sure if the two makes a big
> difference in practice though.
> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, Aug 9, 2016 at 2:31 PM, Joel Koshy  wrote:
> 
>>> 
>>> 
>>> 
>>> On the leader side, one challenge is related to the fairness issue that
>> Ben
>>> brought up. The question is what if the fetch response limit is filled up
>>> by the throttled replicas? If this happens constantly, we will delay the
>>> progress of those un-throttled replicas. However, I think we can address
>>> this issue by trying to fill up the unthrottled replicas in the response
>>> first. So, the algorithm would be. Fill up unthrottled replicas up to the
>>> fetch response limit. If there is space left, fill up throttled replicas.
>>> If quota is exceeded for the throttled replicas, reduce the bytes in the
>>> throttled replicas in the response accordingly.
>>> 
>> 
>> Right - that's what I was trying to convey by truncation (vs empty). So we
>> would attempt to fill the response for throttled partitions as much as we
>> can before hitting the quota limit. There is one more detail to handle in
>> this: if there are several throttled partitions and not enough remaining
>> allowance in the fetch response to include all the throttled replicas then
>> we would need to decide which of those partitions get a share; which is why
>> I'm wondering if it is easier to return empty for those partitions entirely
>> in the fetch response - they will make progress in the subsequent fetch. If
>> they don't make fast enough progress then that would be a case for raising
>> the threshold or letting it complete at an off-peak time.
>> 
>> 
>>> 
>>> With this approach, we need some new logic to handle throttling on the
>>> leader, but we can leave the replica threading model unchanged. So,
>>> overall, this still seems to be a simpler approach.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> On Tue, Aug 9, 2016 at 11:57 AM, Mayuresh Gharat <
>>> gharatmayures...@gmail.com
 wrote:
>>> 
 Nice write up Ben.
 
 I agree with Joel for keeping this simple by excluding the partitions
>>> from
 the fetch request/response when the quota is violated at the follower
>> or
 leader instead of having a separate set of threads for handling the
>> quota
 and non quota cases. Even though its different from the current quota
 implementation it should be OK since its internal to brokers and can be
 handled by tuning the quota configs for it appropriately by the admins.
 
 Also can you elaborate with an example how this would be handled :
 *guaranteeing
 ordering of updates when replicas shift threads*
 
 Thanks,
 
 Mayuresh
 
 
 On Tue, Aug 9, 2016 at 10:49 AM, Joel Koshy 
>> wrote:
 
> On the need for both leader/follower throttling: that makes sense -
 thanks
> for clarifying. For completeness, can we add this detail to the doc -
 say,
> after the quote that I pasted earlier?
> 
> From an implementation perspective though: I’m still interested in
>> the
> simplicity 

Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-10 Thread Jun Rao
Hi, Andrey,

Thanks for the reply. A couple of more comments inline below.

On Wed, Aug 10, 2016 at 3:56 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi!
>
> > On 09 Aug 2016, at 20:46, Jun Rao  wrote:
> >
> > Hi, Andrey,
> >
> > Thanks for the proposal. It looks good overall. Some minor comments.
> >
> > 1. It seems that it's bit weird that fetch.partition.max.bytes is a
> broker
> > level configuration while fetch.limit.bytes is a client side
> configuration.
> > Intuitively, it seems both should be set by the client? If we do that,
> one
> > benefit is that we can validate that fetch.limit.bytes >=
> > fetch.partition.max.bytes on the client side.
> >
>
> Yes, such cooperative configuration for fetch request may look a bit weird.
> But I don’t see other options if we want to remove partition limits from
> fetch request.
> In this case we need some server-side configuration for partition limits.
>
>
What if we keep the current partition level limit in the fetch request and
just add an additional response level limit? The default partition limit
can be much smaller than the max message size and will only be used for
fairness across partitions.


> > 2. Naming wise. fetch.response.max.bytes and replica.fetch.response.max.
> bytes
> > seem to be more consistent with our current convention than
> > fetch.limit.bytes and replica.fetch.limit.bytes.
>
> Agree, will rename.
>
> >
> > 3. When you say "This way we can ensure that response size is less than (
> > *limit_bytes* + *message.max.bytes*).", it should be "less than
> > max(limit_bytes, message.max.bytes)", right?
> >
>
> No, I mean that actual response side can be bigger than limit_bytes, but
> less than limit_bytes + message.max.bytes.
> This behaviour is a result of algorithm proposed in KIP (and in PR).
>
>
Got it. An alternative is to only add a partition's data to the response up
to the remaining response limit. The only exception is that this is the
first partition and the first message in that partition is larger than the
response limit. Then the bound will be max(limit_bytes, message.max.bytes),
which is tighter.


> > Finally, KIP-73 (replication quota) is proposing a similar change to
> fetch
> > request protocol. We can probably just combine the two changes into one,
> > instead of bumping the fetch request version twice.
>
> Fine with that.
>
> >
> > Thanks,
> >
> > Jun
> >
>
> Thanks,
> Andrey.
>
>


Re: [DISCUSS] Time-based releases for Apache Kafka

2016-08-10 Thread Ismael Juma
Hi Gwen,

The proposal sounds good to me. With regards to the cadence, 3 releases a
year (every 4 months as you said) sounds reasonable. One thing that I think
is very important if we release more often is that users should be able to
upgrade directly to the latest release for a reasonable period. For
example, we could say that we support direct upgrades for 2 years (6
releases).

Ismael

On Wed, Aug 10, 2016 at 12:49 AM, Gwen Shapira  wrote:

> Dear Kafka Developers and Users,
>
> In the past, our releases have been quite unpredictable. We'll notice
> that a large number of nice features made it in (or are close),
> someone would suggest a release and we'd do it. This is fun, but makes
> planning really hard - we saw it during the last release which we
> decided to delay by a few weeks to allow more features to "land".
>
> Many other communities have adopted time-based releases successfully
> (Cassandra, GCC, LLVM, Fedora, Gnome, Ubuntu, etc.). And I thought it
> will make sense for the Apache Kafka community to try doing the same.
>
> The benefits of this approach are:
>
> 1. A quicker feedback cycle and users can benefit from features
> quicker (assuming for reasonably short time between releases - I was
> thinking 4 months)
>
> 2. Predictability for contributors and users:
> * Developers and reviewers can decide in advance what release they are
> aiming for with specific features.
> * If a feature misses a release we have a good idea of when it will show
> up.
> * Users know when to expect their features
>
> 3. Transparency - There will be a published cut-off date (AKA feature
> freeze) for the release and people will know about it in advance.
> Hopefully this will remove the contention around which features make
> it.
>
> 4. Quality - we've seen issues pop up in release candidates due to
> last-minute features that didn't have proper time to bake in. More
> time between feature freeze and release will let us test more,
> document more and resolve more issues.
>
> Since nothing is ever perfect, there will be some downsides:
>
> 1. Most notably, features that miss the feature-freeze date for a
> release will have to wait few month for the next release. Features
> will reach users faster overall as per benefit #1, but individual
> features that just miss the cut will lose out
>
> 2. More releases a year mean that being a committer is more work -
> release management is still some headache and we'll have more of
> those. Hopefully we'll get better at it. Also, the committer list is
> growing and hopefully it will be less than once-a-year effort for each
> committer.
>
> 3. For users, figuring out which release to use and having frequent
> new releases to upgrade to may be a bit confusing.
>
> 4. Frequent releases mean we need to do bugfix releases for older
> branches. Right now we only do bugfix releases to latest release.
>
> I think the benefits outweigh the drawbacks. Or at least suggest that
> its worth trying - we can have another discussion in few releases to
> see if we want to keep it that way or try something else.
>
> My suggestion for the process:
>
> 1. We decide on a reasonable release cadence
> 2. We decide on release dates (even rough estimate such as "end of
> February" or something) and work back feature freeze dates.
> 3. Committers volunteer to be "release managers" for specific
> releases. We can coordinate on the list or on a wiki. If no committer
> volunteers, we assume the community doesn't need a release and skip
> it.
> 4. At the "feature freeze" date, the release manager announces the
> contents of the release (which KIPs made it in on time), creates the
> release branch and starts the release process as usual. From this
> point onwards, only bug fixes should be double-committed to the
> release branch while trunk can start collecting features for the
> subsequent release.
>
> Comments and improvements are appreciated.
>
> Gwen Shapira
> Former-release-manager
>


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-10 Thread Ben Stopford
Regarding the fetch.partition.max.bytes, there was some discussion on the Jira 
around removing this setting completely. It’s probably not the easiest thing 
for user’s to set, so there is certainly an argument for removing it. This 
would have the side effect that a catching up broker would fill responses from 
a single partition at a time, but as we’re ensuring fairness across requests 
I’m still left wondering if we need a partition level limit at all?

One middle ground would keep it configurable but with a default of Int.MaxValue.

B 

> On 10 Aug 2016, at 11:56, Andrey L. Neporada  wrote:
> 
> Hi!
> 
>> On 09 Aug 2016, at 20:46, Jun Rao  wrote:
>> 
>> Hi, Andrey,
>> 
>> Thanks for the proposal. It looks good overall. Some minor comments.
>> 
>> 1. It seems that it's bit weird that fetch.partition.max.bytes is a broker
>> level configuration while fetch.limit.bytes is a client side configuration.
>> Intuitively, it seems both should be set by the client? If we do that, one
>> benefit is that we can validate that fetch.limit.bytes >=
>> fetch.partition.max.bytes on the client side.
>> 
> 
> Yes, such cooperative configuration for fetch request may look a bit weird.
> But I don’t see other options if we want to remove partition limits from 
> fetch request.
> In this case we need some server-side configuration for partition limits.
> 
> 
>> 2. Naming wise. fetch.response.max.bytes and replica.fetch.response.max.bytes
>> seem to be more consistent with our current convention than
>> fetch.limit.bytes and replica.fetch.limit.bytes.
> 
> Agree, will rename.
> 
>> 
>> 3. When you say "This way we can ensure that response size is less than (
>> *limit_bytes* + *message.max.bytes*).", it should be "less than
>> max(limit_bytes, message.max.bytes)", right?
>> 
> 
> No, I mean that actual response side can be bigger than limit_bytes, but less 
> than limit_bytes + message.max.bytes.
> This behaviour is a result of algorithm proposed in KIP (and in PR).
> 
> 
>> Finally, KIP-73 (replication quota) is proposing a similar change to fetch
>> request protocol. We can probably just combine the two changes into one,
>> instead of bumping the fetch request version twice.
> 
> Fine with that.
> 
>> 
>> Thanks,
>> 
>> Jun
>> 
> 
> Thanks,
> Andrey.
> 



[jira] [Commented] (KAFKA-3742) Can't run connect-distributed.sh with -daemon flag

2016-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415116#comment-15415116
 ] 

ASF GitHub Bot commented on KAFKA-3742:
---

GitHub user 1ambda opened a pull request:

https://github.com/apache/kafka/pull/1717

KAFKA-3742: Can't run bin/connect-*.sh with -daemon flag

## Problem

Current connect scripts (`connect-distributed.sh`, `connect-standalone.sh`) 
do not support `-daemon` flag even if users specify the flag
since `kafka-run-class.sh` requires that the`-daemon` flag should precede 
other arguments (e.g. class name)

## Solution

Do the same thing like `kafka-server-start.sh`

- Parse a command
- Add `-daemon` to `$EXTRA_ARGS` if exists


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/1ambda/kafka 
KAFKA-3742-connect-running-as-daemon

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1717.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1717


commit cecc8578fdb178630f5918fd4de47c556e13b286
Author: 1ambda <1am...@gmail.com>
Date:   2016-08-10T10:52:15Z

Support ''-daemon' in connect-*.sh




> Can't run connect-distributed.sh with -daemon flag
> --
>
> Key: KAFKA-3742
> URL: https://issues.apache.org/jira/browse/KAFKA-3742
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Geoff Anderson
>Assignee: Liquan Pei
>Priority: Minor
>
> Running on ubuntu 14.04. Discovered while experimenting various different 
> kafka components. 
> This error probably applies to other scripts as well.
> Running connect-distributed.sh thusly
> {code}connect-distributed.sh -daemon /tmp/connect-distributed.properties{code}
> gives errors like this 
> {code}
> root@worker1:/home/vagrant# connect-distributed.sh -daemon 
> /tmp/connect-distributed.properties
> Exception in thread "main" java.io.FileNotFoundException: -daemon (No such 
> file or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at java.io.FileInputStream.(FileInputStream.java:101)
>   at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:446)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:61)
> {code}
> Note that this runs:
> connect-distributed.sh /tmp/connect-distributed.properties -daemon
> However, the daemon flag is not activated in this case
> Underlying cause:
> kafka-run-class.sh assumes -daemon comes before the classpath
> The scripts for which -daemon works use something like
> {code}
> EXTRA_ARGS="-name kafkaServer -loggc"
> COMMAND=$1
> case $COMMAND in
>   -daemon)
> EXTRA_ARGS="-daemon "$EXTRA_ARGS
> shift
> ;;
>   *)
> ;;
> esac
> exec $base_dir/kafka-run-class.sh $EXTRA_ARGS 
> io.confluent.support.metrics.SupportedKafka "$@"
> {code}
> but connect-distributed does this:
> {code}
> exec $(dirname $0)/kafka-run-class.sh 
> org.apache.kafka.connect.cli.ConnectDistributed "$@"
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1717: KAFKA-3742: Can't run bin/connect-*.sh with -daemo...

2016-08-10 Thread 1ambda
GitHub user 1ambda opened a pull request:

https://github.com/apache/kafka/pull/1717

KAFKA-3742: Can't run bin/connect-*.sh with -daemon flag

## Problem

Current connect scripts (`connect-distributed.sh`, `connect-standalone.sh`) 
do not support `-daemon` flag even if users specify the flag
since `kafka-run-class.sh` requires that the`-daemon` flag should precede 
other arguments (e.g. class name)

## Solution

Do the same thing like `kafka-server-start.sh`

- Parse a command
- Add `-daemon` to `$EXTRA_ARGS` if exists


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/1ambda/kafka 
KAFKA-3742-connect-running-as-daemon

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1717.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1717


commit cecc8578fdb178630f5918fd4de47c556e13b286
Author: 1ambda <1am...@gmail.com>
Date:   2016-08-10T10:52:15Z

Support ''-daemon' in connect-*.sh




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-10 Thread Andrey L. Neporada
Hi!

> On 09 Aug 2016, at 20:46, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> Thanks for the proposal. It looks good overall. Some minor comments.
> 
> 1. It seems that it's bit weird that fetch.partition.max.bytes is a broker
> level configuration while fetch.limit.bytes is a client side configuration.
> Intuitively, it seems both should be set by the client? If we do that, one
> benefit is that we can validate that fetch.limit.bytes >=
> fetch.partition.max.bytes on the client side.
> 

Yes, such cooperative configuration for fetch request may look a bit weird.
But I don’t see other options if we want to remove partition limits from fetch 
request.
In this case we need some server-side configuration for partition limits.


> 2. Naming wise. fetch.response.max.bytes and replica.fetch.response.max.bytes
> seem to be more consistent with our current convention than
> fetch.limit.bytes and replica.fetch.limit.bytes.

Agree, will rename.

> 
> 3. When you say "This way we can ensure that response size is less than (
> *limit_bytes* + *message.max.bytes*).", it should be "less than
> max(limit_bytes, message.max.bytes)", right?
> 

No, I mean that actual response side can be bigger than limit_bytes, but less 
than limit_bytes + message.max.bytes.
This behaviour is a result of algorithm proposed in KIP (and in PR).


> Finally, KIP-73 (replication quota) is proposing a similar change to fetch
> request protocol. We can probably just combine the two changes into one,
> instead of bumping the fetch request version twice.

Fine with that.

> 
> Thanks,
> 
> Jun
> 

Thanks,
Andrey.



[jira] [Resolved] (KAFKA-1612) Consumer offsets auto-commit before processing finishes

2016-08-10 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy resolved KAFKA-1612.

Resolution: Won't Fix

Closing this issue in favor new consumer API.

> Consumer offsets auto-commit before processing finishes
> ---
>
> Key: KAFKA-1612
> URL: https://issues.apache.org/jira/browse/KAFKA-1612
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Gian Merlino
>Assignee: Neha Narkhede
>
> In a loop like this,
>   for (message <- kafkaStream) {
>  process(message)
>   }
> The consumer can commit offsets for the next message while "process" is 
> running. If the program crashes during "process", the next run will pick up 
> from the *next* message. The message in flight at the time of the crash will 
> never actually finish processing. Instead, I would have expected the high 
> level consumer to deliver messages at least once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2016-08-10 Thread Magnus Edenhill (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415000#comment-15415000
 ] 

Magnus Edenhill commented on KAFKA-1588:


I see now that this behaviour is infact documented in the protocol spec, sorry 
about that. I rest my case.

> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Todd Palino
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2016-08-10 Thread Magnus Edenhill (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414995#comment-15414995
 ] 

Magnus Edenhill commented on KAFKA-1588:


Thanks [~ijuma].
But I dont see how the bug fix changes the protocol (the behaviour is 
undocumented and non-intuitive), or breaks any clients (how can a client rely 
or make use of undefined ordering, silent ignore of duplicate topics, etc?).

For the greater good of the community and eco-system I feel it is important 
that the protocol specification is considered authoritative rather than the 
broker implementation, and any discrepencies between the two should be fixed in 
the corresponding implementation(s) rather than the protocol spec unless the 
protocol spec is clearly wrong (which would affect all protocol implementations 
(clients)) .


> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Todd Palino
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2016-08-10 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414955#comment-15414955
 ] 

Ismael Juma commented on KAFKA-1588:


[~edenhill], you're right that it technically doesn't require a protocol 
change, just a change in the broker and some clients (the ones that also rely 
on a Map/Dict) implementations. Having said that, the bug fix would change the 
behaviour of the protocol, so it could have compatibility implications anyway.

> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Todd Palino
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2016-08-10 Thread Magnus Edenhill (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414868#comment-15414868
 ] 

Magnus Edenhill commented on KAFKA-1588:


[~guozhang] Why would this fix require a protocol change? It is already an 
array of Topics+partitions in the OffsetRequest and all the broker needs to do 
is honour the contents and order of that list and respond accordingly, which 
should be a broker implementation detail only and not affect the protocol 
definition, right?

> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Todd Palino
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

2016-08-10 Thread Andrew Coates
I'm still very interested in seeing this KIP progress ...
On Tue, 2 Aug 2016 at 20:09, Vahid S Hashemian 
wrote:

> I would like to revive this thread and ask for additional feedback on this
> KIP.
>
> There have already been some feedback, mostly in favor, plus some concern
> about the value gain considering the complexity and the semantics; i.e.
> how the eventually revoked assignments need to be processed in the
> onPartitionsAssigned() callback, and not in onPartitionsRevoked().
>
> If it helps, I could also send a note to users mailing list about this KIP
> and ask for their feedback.
> I could also put the KIP up for a vote if that is expected at this point.
>
> Thanks.
> --Vahid
>
>
>


Re: [VOTE] KIP-70: Revise Partition Assignment Semantics on New Consumer's Subscription Change

2016-08-10 Thread Manikumar Reddy
+1 (non-binding)

On Wed, Aug 10, 2016 at 8:30 AM, Ewen Cheslack-Postava 
wrote:

> +1 (binding), thanks for working on this Vahid.
>
> @Dana - See https://cwiki.apache.org/confluence/display/KAFKA/Bylaws re:
> binding/non-binding, although I now notice that we specify criteria (lazy
> majority) on the KIP overview
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Improvement+Proposals#KafkaImprovementProposals-Process
> but don't seem to specify whose votes are binding -- we've used active
> committers as binding votes for KIPs.
>
> -Ewen
>
> On Tue, Aug 9, 2016 at 11:25 AM, Guozhang Wang  wrote:
>
> > +1.
> >
> > On Tue, Aug 9, 2016 at 10:06 AM, Jun Rao  wrote:
> >
> > > Vahid,
> > >
> > > Thanks for the clear explanation in the KIP. +1
> > >
> > > Jun
> > >
> > > On Mon, Aug 8, 2016 at 11:53 AM, Vahid S Hashemian <
> > > vahidhashem...@us.ibm.com> wrote:
> > >
> > > > I would like to initiate the voting process for KIP-70 (
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 70%3A+Revise+Partition+Assignment+Semantics+on+New+
> > > > Consumer%27s+Subscription+Change
> > > > ).
> > > >
> > > > The only issue that was discussed in the discussion thread is
> > > > compatibility, but because it applies to an edge case, it is not
> > expected
> > > > to impact existing users.
> > > > The proposal was shared with Spark and Storm users and no issue was
> > > raised
> > > > by those communities.
> > > >
> > > > Thanks.
> > > >
> > > > Regards,
> > > > --Vahid
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: Consumer Offset Migration Tool

2016-08-10 Thread Joel Koshy
There is an old jira on getting the old import/export ZK offsets tool to
work for Kafka-based offsets
. Ideally, that should be
done in the kafka-consumer-groups tool. We could have kafka-consumer-groups
tool support import/export for ZK-based offsets as well. So that would
avoid the need for a separate migration tool. i.e., export from ZK then
import into Kafka.

Thanks,

Joel

On Tue, Aug 9, 2016 at 6:50 PM, Grant Henke  wrote:

> Hi Jun,
>
> Exactly what Gwen said. I am assuming shutdown old consumers, migrate
> offsets, start new consumers. This is best for cases where you are
> migrating from the old clients to the new clients without ever using dual
> commit in the old client. Because the new clients can't coordinate with the
> old ones an outage is required regardless.
>
> Thanks,
> Grant
>
> On Tue, Aug 9, 2016 at 8:19 PM, Gwen Shapira  wrote:
>
> > Jun,
> >
> > Grant's use-case is about migrating from old-consumer-committing-to-ZK
> > to new-consumer-committing-to-Kafka (which is what happens if you
> > upgrade Flume, and maybe other stream processing systems too). This
> > seems to require shutting down all instances in any case.
> >
> > Gwen
> >
> > On Tue, Aug 9, 2016 at 6:05 PM, Jun Rao  wrote:
> > > Hi, Grant,
> > >
> > > For your tool to work, do you expect all consumer instances in the
> > consumer
> > > group to be stopped before copying the offsets? Some applications may
> not
> > > want to do that.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Aug 9, 2016 at 10:01 AM, Grant Henke 
> > wrote:
> > >
> > >> I had to write a simple offset migration tool and I wanted to get
> > feedback
> > >> on whether or not this would be a useful addition to Apache Kafka.
> > >>
> > >> Currently the path to upgrade from the zookeeper offsets to the Kafka
> > >> offset (and often the Scala to Java client) is via dual commit. The
> > process
> > >> is documented here:
> > >> http://kafka.apache.org/documentation.html#offsetmigration
> > >>
> > >> The reason that process wasn't sufficient in my case is because:
> > >>
> > >>- It needs to be done ahead of the upgrade
> > >>- It requires the old client to commit at least once in dual commit
> > mode
> > >>- Some frameworks don't expose the dual commit functionality well
> > >>- Dual commit is not supported in 0.8.1.x
> > >>
> > >> The tool I wrote takes the relevant connection information and a
> > consumer
> > >> group and simply copies the Zookeeper offsets into the Kafka offsets
> for
> > >> that group.
> > >> A rough WIP PR can be seen here: https://github.com/apache/
> > kafka/pull/1715
> > >>
> > >> Even though many users have already made the transition, I think this
> > could
> > >> still be useful in Kafka. Here are a few reasons:
> > >>
> > >>- It simplifies the migration for users who have yet to migrate,
> > >>especially as the old clients get deprecated and removed
> > >>- Though the tool is not available in the Kafka 0.8.x or 0.9.x
> > series,
> > >>downloading and using the jar from maven would be fairly
> > straightforward
> > >>   - Alternatively this could be a separate repo or jar, though I
> > hardly
> > >>   want to push this single tool to maven as a standalone artifact.
> > >>
> > >> Do you think this is useful in Apache Kafka? Any thoughts on the
> > approach?
> > >>
> > >> Thanks,
> > >> Grant
> > >> --
> > >> Grant Henke
> > >> Software Engineer | Cloudera
> > >> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> > >>
> >
> >
> >
> > --
> > Gwen Shapira
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>


This is not a spam - ISR issues

2016-08-10 Thread Pascu, Ciprian (Nokia - FI/Espoo)
Hi,


Any comments on this?


Ciprian.



Lähettäjä: Pascu, Ciprian (Nokia - FI/Espoo)
Lähetetty: 9. elokuuta 2016 15:34:10
Vastaanottaja: dev@kafka.apache.org
Aihe: ISR issues


Hi,

We are testing Kafka cluster with high traffic loads (2+ messages/second) 
and we encounter quite frequently issues with brokers dropping persistently 
from ISR for some partitions. Looking at the code, I noticed in class 
AbstractFetcherThread , processFetchRequest method, that a KafkaException is 
thrown, in case some other exception than CorruptRecordException is generated 
from processPartitionData method. In my understanding, this will cause the 
fetcher thread to end and thus the replica update it was doing will stop and 
the broker will be removed from some ISR lists. Couldn't we, in this case also, 
just log some error message and update 'partitionsWithError' (like it's done in 
the 'case OFFSET_OUT_OF_RANGE'  and the 'case _' branches)?


Ciprian.