Re: Agenda item for next KIP meeting

2016-09-12 Thread Becket Qin
Hey Ismael,

I probably replied too late, but is it possible to add KIP-79 to the agenda
as well?

Jiangjie (Becket) Qin

On Fri, Sep 9, 2016 at 9:57 AM, radai  wrote:

> Hi,
>
> I'd like to discuss KIP-72. could you please add me?
>
> On Fri, Sep 9, 2016 at 7:48 AM, Ismael Juma  wrote:
>
> > Hi Vahid,
> >
> > Sounds good. Jun is unavailable on the day, so I'll send the invite. Any
> > other KIPs that people would like to discuss on the day (Tuesday at 11am
> > PT)? The following KIPs are under discussion and have either been
> submitted
> > or updated recently:
> >
> > KIP-68: Add a consumed log retention before log retention
> > KIP-72: Allow Sizing Incoming Request Queue in Bytes
> > KIP-79: ListOffsetRequest/ListOffsetResponse v1 and add timestamp search
> > methods to the new consumer
> >
> > If the authors of any KIPs are available and interested to discuss them
> on
> > the next KIP call, please let me know so that I can add them to the
> agenda.
> >
> > Ismael
> >
> > On Wed, Sep 7, 2016 at 12:14 AM, Vahid S Hashemian <
> > vahidhashem...@us.ibm.com> wrote:
> >
> > > Hi,
> > >
> > > I'd like to add KIP-54 to the agenda for next KIP meeting.
> > > This KIP has been in discussion phase for a long time, and it would be
> > > nice to have an online discussion about it, collect additional
> feedback,
> > > and move forward, if possible.
> > >
> > > Thanks.
> > >
> > > Regards,
> > > --Vahid
> > >
> > >
> >
>


Re: [DISCUSS] Kafka 0.10.1.0 Release Plan

2016-09-12 Thread Jason Gustafson
Hi All,

I've added the release plan to the wiki:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1. I've
begun moving some JIRAs to 0.10.2 which didn't seem to be being actively
worked on, but there's still a lot of pruning to do. If there are no
objections, I will begin a vote on this release plan.

Thanks,
Jason

On Sun, Sep 11, 2016 at 10:05 AM, Jason Gustafson 
wrote:

> Hey Rajini,
>
> We took a long look at KIP-55 and decided that the time needed to review,
> stabilize, and add system testing might not be sufficient. Usually a
> somewhat large patch like that takes a couple weeks of iteration before
> landing in trunk. For a new security feature, it might be even longer. We
> could delay the release for a week, but it's hard to know if that's enough
> time and that might just put some other feature on edge (sort of by
> induction, we risk never cutting the release). That said, if one of the
> committers thinks it has a chance to get in and has the time to push it
> through review, then I'm happy to add it.
>
> Thanks,
> Jason
>
> On Sat, Sep 10, 2016 at 1:06 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
>> Would it be possible to include KIP-55: Secure Quotas
>> > 3A+Secure+Quotas+for+Authenticated+Users>
>> as
>> well? The KIP was approved a while ago and the PR was submitted several
>> weeks ago. I was hoping it would get reviewed in time for the next
>> release.
>> Jun had said he would take a look.
>>
>>
>> Thank you,
>>
>> Rajini
>>
>> On Sat, Sep 10, 2016 at 8:26 AM, Ismael Juma  wrote:
>>
>> > Jason, thanks for putting this together and driving the release. Your
>> > proposal sounds good to me. It would be nice to create a wiki page with
>> the
>> > information in this email. See the following for the one that Gwen put
>> > together for 0.10.0:
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.0
>> >
>> > Also, you merged KIP-70 recently so that can be moved to the completed
>> > section.
>> >
>> > Ismael
>> >
>> > On Fri, Sep 9, 2016 at 11:45 PM, Jason Gustafson 
>> > wrote:
>> >
>> > > Hi All,
>> > >
>> > > I've volunteered to be release manager for the upcoming 0.10.1 release
>> > and
>> > > would like to propose the following timeline:
>> > >
>> > > Feature Freeze (Sep. 19): The 0.10.1 release branch will be created.
>> > > Code Freeze (Oct. 3): The first RC will go out.
>> > > Final Release (~Oct. 17): Assuming no blocking issues remain, the
>> final
>> > > release will be cut.
>> > >
>> > > The purpose of the time between the feature freeze and code freeze is
>> to
>> > > stabilize the set of release features. We will continue to accept bug
>> > fixes
>> > > during this time and new system tests, but no new features will be
>> merged
>> > > into the release branch (they will continue to be accepted in trunk,
>> > > however). After the code freeze, only blocking bug fixes will be
>> > accepted.
>> > > Features which cannot be completed in time will have to await the next
>> > > release cycle.
>> > >
>> > > This is the first iteration of the time-based release plan:
>> > > https://cwiki.apache.org/confluence/display/KAFKA/Time+
>> > Based+Release+Plan.
>> > > Note
>> > > that the final release is scheduled for October 17, so we have a
>> little
>> > > more than a month to prepare.
>> > >
>> > > Features which have already been merged to trunk and will be included
>> in
>> > > this release include the following:
>> > >
>> > > KIP-4 (partial): Add request APIs to create and delete topics
>> > > KIP-33: Add time-based index
>> > > KIP-60: Make Java client classloading more flexible
>> > > KIP-62: Allow consumer to send heartbeats from a background thread
>> > > KIP-65: Expose timestamps to Connect
>> > > KIP-67: Queryable state for Kafka Streams
>> > > KIP-71: Enable log compaction and deletion to co-exist
>> > > KIP-75 - Add per-connector Converters
>> > >
>> > > Since this is the first time-based release, we propose to also include
>> > the
>> > > following KIPs which already have a patch available and have undergone
>> > some
>> > > review:
>> > >
>> > > KIP-58: Make log compaction point configurable
>> > > KIP-63: Unify store and downstream caching in streams
>> > > KIP-70: Revise consumer partition assignment semantics
>> > > KIP-73: Replication quotas
>> > > KIP-74: Add fetch response size limit in bytes
>> > > KIP-78: Add clusterId
>> > >
>> > > One of the goals of time-based releases is to avoid the rush to get
>> > > unstable features in before the release deadline. If a feature is not
>> > ready
>> > > now, the next release window is never far away. This helps to ensure
>> the
>> > > overall quality of the release. We've drawn the line for this release
>> > based
>> > > on feature progress and code review. For features which can't get in
>> this
>> > > time, don't worry since January will be here 

[jira] [Updated] (KAFKA-3590) KafkaConsumer fails with "Messages are rejected since there are fewer in-sync replicas than required." when polling

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3590:
---
Fix Version/s: 0.10.1.0

> KafkaConsumer fails with "Messages are rejected since there are fewer in-sync 
> replicas than required." when polling
> ---
>
> Key: KAFKA-3590
> URL: https://issues.apache.org/jira/browse/KAFKA-3590
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
> Environment: JDK1.8 Ubuntu 14.04
>Reporter: Sergey Alaev
>Assignee: Jason Gustafson
> Fix For: 0.10.1.0
>
>
> KafkaConsumer.poll() fails with "Messages are rejected since there are fewer 
> in-sync replicas than required.". Isn't this message about minimum number of 
> ISR's when *sending* messages?
> Stacktrace:
> org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: 
> Messages are rejected since there are fewer in-sync replicas than required.
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupRequestHandler.handle(AbstractCoordinator.java:444)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupRequestHandler.handle(AbstractCoordinator.java:411)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) 
> ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
>  ~[kafka-clients-0.9.0.1.jar:na]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
> ~[kafka-clients-0.9.0.1.jar:na]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3073) KafkaConnect should support regular expression for topics

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3073:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> KafkaConnect should support regular expression for topics
> -
>
> Key: KAFKA-3073
> URL: https://issues.apache.org/jira/browse/KAFKA-3073
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Assignee: Liquan Pei
>  Labels: needs-kip
> Fix For: 0.10.2.0
>
>
> KafkaConsumer supports both a list of topics or a pattern when subscribing. 
> KafkaConnect only supports a list of topics, which is not just more of a 
> hassle to configure - it also requires more maintenance.
> I suggest adding topics.pattern as a new configuration and letting users 
> choose between 'topics' or 'topics.pattern'.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2311) Consumer's ensureNotClosed method not thread safe

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15486116#comment-15486116
 ] 

ASF GitHub Bot commented on KAFKA-2311:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1637


> Consumer's ensureNotClosed method not thread safe
> -
>
> Key: KAFKA-2311
> URL: https://issues.apache.org/jira/browse/KAFKA-2311
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tim Brooks
>Assignee: Tim Brooks
> Fix For: 0.10.1.0
>
> Attachments: KAFKA-2311.patch, KAFKA-2311.patch
>
>
> When a call is to the consumer is made, the first check is to see that the 
> consumer is not closed. This variable is not volatile so there is no 
> guarantee previous stores will be visible before a read.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2311) Consumer's ensureNotClosed method not thread safe

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-2311:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Consumer's ensureNotClosed method not thread safe
> -
>
> Key: KAFKA-2311
> URL: https://issues.apache.org/jira/browse/KAFKA-2311
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tim Brooks
>Assignee: Tim Brooks
> Fix For: 0.10.1.0
>
> Attachments: KAFKA-2311.patch, KAFKA-2311.patch
>
>
> When a call is to the consumer is made, the first check is to see that the 
> consumer is not closed. This variable is not volatile so there is no 
> guarantee previous stores will be visible before a read.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1637: KAFKA-2311: Consumer's ensureNotClosed method not ...

2016-09-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1637


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4154) Kafka Connect fails to shutdown if it has not completed startup

2016-09-12 Thread Shikhar Bhushan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shikhar Bhushan updated KAFKA-4154:
---
Fix Version/s: (was: 0.10.0.2)

> Kafka Connect fails to shutdown if it has not completed startup
> ---
>
> Key: KAFKA-4154
> URL: https://issues.apache.org/jira/browse/KAFKA-4154
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Shikhar Bhushan
>
> To reproduce:
> 1. Start Kafka Connect in distributed mode without Kafka running 
> {{./bin/connect-distributed.sh config/connect-distributed.properties}}
> 2. Ctrl+C fails to terminate the process
> thread dump:
> {noformat}
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):
> "Thread-1" #13 prio=5 os_prio=31 tid=0x7fc29a18a800 nid=0x7007 waiting on 
> condition [0x73129000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x0007bd7d91d8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.stop(DistributedHerder.java:357)
>   at 
> org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71)
>   at 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93)
> "SIGINT handler" #27 daemon prio=9 os_prio=31 tid=0x7fc29aa6a000 
> nid=0x560f in Object.wait() [0x71a61000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0x0007bd63db38> (a 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook)
>   at java.lang.Thread.join(Thread.java:1245)
>   - locked <0x0007bd63db38> (a 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook)
>   at java.lang.Thread.join(Thread.java:1319)
>   at 
> java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
>   at 
> java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
>   at java.lang.Shutdown.runHooks(Shutdown.java:123)
>   at java.lang.Shutdown.sequence(Shutdown.java:167)
>   at java.lang.Shutdown.exit(Shutdown.java:212)
>   - locked <0x0007b0244600> (a java.lang.Class for 
> java.lang.Shutdown)
>   at java.lang.Terminator$1.handle(Terminator.java:52)
>   at sun.misc.Signal$1.run(Signal.java:212)
>   at java.lang.Thread.run(Thread.java:745)
> "kafka-producer-network-thread | producer-1" #15 daemon prio=5 os_prio=31 
> tid=0x7fc29a0b7000 nid=0x7a03 runnable [0x72608000]
>java.lang.Thread.State: RUNNABLE
>   at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
>   at 
> sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
>   at 
> sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
>   at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>   - locked <0x0007bd7788d8> (a sun.nio.ch.Util$2)
>   - locked <0x0007bd7788e8> (a 
> java.util.Collections$UnmodifiableSet)
>   - locked <0x0007bd77> (a sun.nio.ch.KQueueSelectorImpl)
>   at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>   at 
> org.apache.kafka.common.network.Selector.select(Selector.java:470)
>   at 
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>   at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
>   at java.lang.Thread.run(Thread.java:745)
> "DistributedHerder" #14 prio=5 os_prio=31 tid=0x7fc29a11e000 nid=0x7803 
> waiting on condition [0x72505000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   

[jira] [Updated] (KAFKA-740) Improve crash-safety of log segment swap

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-740:
--
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Improve crash-safety of log segment swap
> 
>
> Key: KAFKA-740
> URL: https://issues.apache.org/jira/browse/KAFKA-740
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Fix For: 0.10.2.0
>
>
> Currently Log.replaceSegments has a bug that can cause a swap containing 
> multiple segments to partially complete. This would lead to duplicate data in 
> the log.
> The proposed fix is to use a name like offset1_and_offset2.swap for a segment 
> meant to replace segments with base offsets offset1 and offset2.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3543:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>  Labels: api
> Fix For: 0.10.2.0
>
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.
> -
> It is worth considering adding a new flatTransofrm function as 
> {code}
>  KStream transform(TransformerSupplier Iterable>> transformerSupplier, String... stateStoreNames)
> {code}
> which is essentially the same as
> {code} transform().flatMap() {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4154) Kafka Connect fails to shutdown if it has not completed startup

2016-09-12 Thread Shikhar Bhushan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shikhar Bhushan updated KAFKA-4154:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.0.2

> Kafka Connect fails to shutdown if it has not completed startup
> ---
>
> Key: KAFKA-4154
> URL: https://issues.apache.org/jira/browse/KAFKA-4154
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Shikhar Bhushan
> Fix For: 0.10.0.2
>
>
> To reproduce:
> 1. Start Kafka Connect in distributed mode without Kafka running 
> {{./bin/connect-distributed.sh config/connect-distributed.properties}}
> 2. Ctrl+C fails to terminate the process
> thread dump:
> {noformat}
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):
> "Thread-1" #13 prio=5 os_prio=31 tid=0x7fc29a18a800 nid=0x7007 waiting on 
> condition [0x73129000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x0007bd7d91d8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.stop(DistributedHerder.java:357)
>   at 
> org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71)
>   at 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93)
> "SIGINT handler" #27 daemon prio=9 os_prio=31 tid=0x7fc29aa6a000 
> nid=0x560f in Object.wait() [0x71a61000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0x0007bd63db38> (a 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook)
>   at java.lang.Thread.join(Thread.java:1245)
>   - locked <0x0007bd63db38> (a 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook)
>   at java.lang.Thread.join(Thread.java:1319)
>   at 
> java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
>   at 
> java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
>   at java.lang.Shutdown.runHooks(Shutdown.java:123)
>   at java.lang.Shutdown.sequence(Shutdown.java:167)
>   at java.lang.Shutdown.exit(Shutdown.java:212)
>   - locked <0x0007b0244600> (a java.lang.Class for 
> java.lang.Shutdown)
>   at java.lang.Terminator$1.handle(Terminator.java:52)
>   at sun.misc.Signal$1.run(Signal.java:212)
>   at java.lang.Thread.run(Thread.java:745)
> "kafka-producer-network-thread | producer-1" #15 daemon prio=5 os_prio=31 
> tid=0x7fc29a0b7000 nid=0x7a03 runnable [0x72608000]
>java.lang.Thread.State: RUNNABLE
>   at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
>   at 
> sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
>   at 
> sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
>   at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>   - locked <0x0007bd7788d8> (a sun.nio.ch.Util$2)
>   - locked <0x0007bd7788e8> (a 
> java.util.Collections$UnmodifiableSet)
>   - locked <0x0007bd77> (a sun.nio.ch.KQueueSelectorImpl)
>   at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>   at 
> org.apache.kafka.common.network.Selector.select(Selector.java:470)
>   at 
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>   at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
>   at java.lang.Thread.run(Thread.java:745)
> "DistributedHerder" #14 prio=5 os_prio=31 tid=0x7fc29a11e000 nid=0x7803 
> waiting on condition [0x72505000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   

[jira] [Created] (KAFKA-4155) Remove WorkerGroupMember from Connect

2016-09-12 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-4155:
--

 Summary: Remove WorkerGroupMember from Connect
 Key: KAFKA-4155
 URL: https://issues.apache.org/jira/browse/KAFKA-4155
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Jason Gustafson
Assignee: Ewen Cheslack-Postava


After a few refactors, it doesn't seem like {{WorkerGroupMember}} is bringing 
much to the table anymore since it does little more than delegate to 
{{WorkerCoordinator}}. Maybe we ought to just remove it?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3478) Finer Stream Flow Control

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3478:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Finer Stream Flow Control
> -
>
> Key: KAFKA-3478
> URL: https://issues.apache.org/jira/browse/KAFKA-3478
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.2.0
>
>
> Today we have a event-time based flow control mechanism in order to 
> synchronize multiple input streams in a best effort manner:
> http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps
> However, there are some use cases where users would like to have finer 
> control of the input streams, for example, with two input streams, one of 
> them always reading from offset 0 upon (re)-starting, and the other reading 
> for log end offset.
> Today we only have one consumer config "offset.auto.reset" to control that 
> behavior, which means all streams are read either from "earliest" or "latest".
> We should consider how to improve this settings to allow users have finer 
> control over these frameworks.
> =
> A finer flow control could also be used to allow for populating a {{KTable}} 
> (with an "initial" state) before starting the actual processing (this feature 
> was ask for in the mailing list multiple times already). Even if it is quite 
> hard to define, *when* the initial populating phase should end, this might 
> still be useful. There would be the following possibilities:
>  1) an initial fixed time period for populating
>(it might be hard for a user to estimate the correct value)
>  2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
>  3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
>  4) a throughput threshold, ie, if the populating frequency falls below
> the threshold, the KTable is considered "finished"
>  5) maybe something else ??
> The API might look something like this
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2000) Delete consumer offsets from kafka once the topic is deleted

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-2000:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Delete consumer offsets from kafka once the topic is deleted
> 
>
> Key: KAFKA-2000
> URL: https://issues.apache.org/jira/browse/KAFKA-2000
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriharsha Chintalapani
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-2000.patch, KAFKA-2000_2015-05-03_10:39:11.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2045) Memory Management on the consumer

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-2045:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
> Fix For: 0.10.2.0
>
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1704) Add PartitionConfig besides LogConfig

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1704:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Add PartitionConfig besides LogConfig
> -
>
> Key: KAFKA-1704
> URL: https://issues.apache.org/jira/browse/KAFKA-1704
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.10.2.0
>
>
> Today we only two places to store configs: server configs which is used to 
> store server side global configs, and log configs to store others. However, 
> many topic / partition level configs would be better stored in a partition 
> config such that they do not need to require accessing the underlying logs, 
> for example:
> 1. uncleanLeaderElectionEnable
> 2. minInSyncReplicas
> 3. compact [? this is defined per-topic / partition but maybe ok to store as 
> log configs]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2307) Drop ConsumerOffsetChecker completely

2016-09-12 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485991#comment-15485991
 ] 

Jason Gustafson commented on KAFKA-2307:


We've had the deprecation warning on this tool since 0.9.0. Perhaps it makes 
sense to remove this now? What do you think [~ijuma]?

> Drop ConsumerOffsetChecker completely
> -
>
> Key: KAFKA-2307
> URL: https://issues.apache.org/jira/browse/KAFKA-2307
> Project: Kafka
>  Issue Type: Sub-task
>  Components: tools
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
> Fix For: 0.10.1.0
>
>
> ConsumerOffsetChecker has been replaced by ConsumerGroupCommand and is 
> deprecated in 0.9.0. Should be dropped in 0.9.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3677) Add a producer performance tuning tool

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3677:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Add a producer performance tuning tool
> --
>
> Key: KAFKA-3677
> URL: https://issues.apache.org/jira/browse/KAFKA-3677
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.2.0
>
>
> In general, the producer of Kafka needs to be tuned based on the user traffic 
> pattern in order to get the optimal performance. It would be useful to 
> provide a tool that helps user explore different settings based on the user 
> traffic pattern (message size, compression type and ratio). 
> This ticket will use ProducerPerformance with synthetic traffic of the data 
> pattern specified by user to to explore different producer configurations and 
> offer performance tuning suggestions to the users.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3575) Use console consumer access topic that does not exist, can not use "Control + C" to exit process

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3575:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Use console consumer access topic that does not exist, can not use "Control + 
> C" to exit process
> 
>
> Key: KAFKA-3575
> URL: https://issues.apache.org/jira/browse/KAFKA-3575
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: SUSE Linux Enterprise Server 11 SP3
>Reporter: NieWang
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> 1.  use "sh kafka-console-consumer.sh --zookeeper 10.252.23.133:2181 --topic 
> topic_02"  start console consumer. topic_02 does not exist.
> 2. you can not use "Control + C" to exit console consumer process. The 
> process is blocked.
> 3. use jstack check process stack, as follows:
> linux:~ # jstack 122967
> 2016-04-18 15:46:06
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):
> "Attach Listener" #29 daemon prio=9 os_prio=0 tid=0x01781800 
> nid=0x1e0c8 waiting on condition [0x]
>java.lang.Thread.State: RUNNABLE
> "Thread-4" #27 prio=5 os_prio=0 tid=0x018a4000 nid=0x1e08a waiting on 
> condition [0x7ffbe5ac]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe00ed3b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at kafka.tools.ConsoleConsumer$$anon$1.run(ConsoleConsumer.scala:101)
> "SIGINT handler" #28 daemon prio=9 os_prio=0 tid=0x019d5800 
> nid=0x1e089 in Object.wait() [0x7ffbe5bc1000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.$$YJP$$wait(Native Method)
> at java.lang.Object.wait(Object.java)
> at java.lang.Thread.join(Thread.java:1245)
> - locked <0xe71fd4e8> (a kafka.tools.ConsoleConsumer$$anon$1)
> at java.lang.Thread.join(Thread.java:1319)
> at 
> java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
> at 
> java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
> at java.lang.Shutdown.runHooks(Shutdown.java:123)
> at java.lang.Shutdown.sequence(Shutdown.java:167)
> at java.lang.Shutdown.exit(Shutdown.java:212)
> - locked <0xe00abfd8> (a java.lang.Class for 
> java.lang.Shutdown)
> at java.lang.Terminator$1.handle(Terminator.java:52)
> at sun.misc.Signal$1.run(Signal.java:212)
> at java.lang.Thread.run(Thread.java:745)
> "metrics-meter-tick-thread-2" #20 daemon prio=5 os_prio=0 
> tid=0x7ffbec77a800 nid=0x1e079 waiting on condition [0x7ffbe66c8000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe6fa6438> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "metrics-meter-tick-thread-1" #19 daemon prio=5 os_prio=0 
> tid=0x7ffbec783000 nid=0x1e078 waiting on condition [0x7ffbe67c9000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe6fa6438> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>  

[jira] [Updated] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3995:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Add a new configuration "enable.comrpession.ratio.estimation" to the producer 
> config
> 
>
> Key: KAFKA-3995
> URL: https://issues.apache.org/jira/browse/KAFKA-3995
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> We recently see a few cases where RecordTooLargeException is thrown because 
> the compressed message sent by KafkaProducer exceeded the max message size.
> The root cause of this issue is because the compressor is estimating the 
> batch size using an estimated compression ratio based on heuristic 
> compression ratio statistics. This does not quite work for the traffic with 
> highly variable compression ratios. 
> For example, if the batch size is set to 1MB and the max message size is 1MB. 
> Initially a the producer is sending messages (each message is 1MB) to topic_1 
> whose data can be compressed to 1/10 of the original size. After a while the 
> estimated compression ratio in the compressor will be trained to 1/10 and the 
> producer would put 10 messages into one batch. Now the producer starts to 
> send messages (each message is also 1MB) to topic_2 whose message can only be 
> compress to 1/5 of the original size. The producer would still use 1/10 as 
> the estimated compression ratio and put 10 messages into a batch. That batch 
> would be 2 MB after compression which exceeds the maximum message size. In 
> this case the user do not have many options other than resend everything or 
> close the producer if they care about ordering.
> This is especially an issue for services like MirrorMaker whose producer is 
> shared by many different topics.
> To solve this issue, we can probably add a configuration 
> "enable.compression.ratio.estimation" to the producer. So when this 
> configuration is set to false, we stop estimating the compressed size but 
> will close the batch once the uncompressed bytes in the batch reaches the 
> batch size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-747) RequestChannel re-design

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-747:
--
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> RequestChannel re-design
> 
>
> Key: KAFKA-747
> URL: https://issues.apache.org/jira/browse/KAFKA-747
> Project: Kafka
>  Issue Type: New Feature
>  Components: network
>Reporter: Jay Kreps
>Assignee: Neha Narkhede
> Fix For: 0.10.2.0
>
>
> We have had some discussion around how to handle queuing requests. There are 
> two competing concerns:
> 1. We need to maintain request order on a per-socket basis.
> 2. We want to be able to balance load flexibly over a pool of threads so that 
> if one thread blocks on I/O request processing continues.
> Two Approaches We Have Considered
> 1. Have a global queue of unprocessed requests. All I/O threads read requests 
> off this global queue and process them. To avoid re-ordering have the network 
> layer only read one request at a time.
> 2. Have a queue per I/O thread and have the network threads statically map 
> sockets to I/O thread request queues.
> Problems With These Approaches
> In the first case you are not able to get any per-producer parallelism. That 
> is you can't read the next request while the current one is being handled. 
> This seems like it would not be a big deal, but preliminary benchmarks show 
> that it might be. 
> In the second case there are two problems. The first is that when an I/O 
> thread gets blocked all request processing for sockets attached to that I/O 
> thread will grind to a halt. If you have 10,000 connections, and  10 I/O 
> threads, then each blockage will stop 1,000 producers. If there is one topic 
> that has long synchronous flush times enabled (or is experiencing fsync 
> locking) this will cause big latency blips for all producers using that I/O 
> thread. The next problem is around backpressure and memory management. Say we 
> use BlockingQueues to feed the I/O threads. And say that one I/O thread 
> stalls. It's request queue will fill up and it will then block ALL network 
> threads, since they will block on inserting into that queue, even though the 
> other I/O threads are unused and have empty queues.
> A Proposed Better Solution
> The problem with the first solution is that we are not pipelining requests. 
> The problem with the second approach is that we are too constrained in moving 
> work from one I/O thread to another.
> Instead we should have a single request queue-like structure, but internally 
> enforce the condition that requests are not re-ordered.
> Here are the details. We retain RequestChannel but refactor its internals. 
> Internally we replace the blocking queue with a linked list. We also keep an 
> in-flight-keys array with one entry per I/O thread. When removing a work item 
> from the list we can't just take the first thing. Instead we need to walk the 
> list and look for something with a request key not in the in-flight-keys 
> array. When a response is sent, we remove that key from the in-flight array.
> This guarantees that requests for a socket with key K are ordered, but that 
> processing for K can only block requests made by K.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1088) Ability to create a shadow consumer group

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1088:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Ability to create a shadow consumer group
> -
>
> Key: KAFKA-1088
> URL: https://issues.apache.org/jira/browse/KAFKA-1088
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Andrew Olson
>Assignee: Neha Narkhede
> Fix For: 0.10.2.0
>
>
> I have a consumer group API request that hopefully can be included in the 
> client rewrite [1] project: the ability to create a new consumer group that 
> is initialized to the latest topic/partition offsets of another existing 
> group. Our use case is being able to inspect an active group's unprocessed 
> messages in a non-invasive manner from an admin or troubleshooting 
> perspective. This shadow group would be short-lived and given a randomly 
> generated  name. It's nice to see that we'll be able to designate it as 
> ephemeral as well so that it is automatically cleaned up.
> To obtain this message browsing functionality today with 0.8, we 
> programmatically copy the group ZK paths prior to activating the "spy" group 
> so that it only sees the unprocessed messages of the target. Obviously not 
> ideal, but it worked well as a quick hack since we understand how Kafka 
> writes and reads the offset data in ZK.
> [1] https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3522) Consider adding version information into rocksDB storage format

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3522:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Consider adding version information into rocksDB storage format
> ---
>
> Key: KAFKA-3522
> URL: https://issues.apache.org/jira/browse/KAFKA-3522
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Ishita Mandhan
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Kafka Streams does not introduce any modifications to the data format in the 
> underlying Kafka protocol, but it does use RocksDB for persistent state 
> storage, and currently its data format is fixed and hard-coded. We want to 
> consider the evolution path in the future we we change the data format, and 
> hence having some version info stored along with the storage file / directory 
> would be useful.
> And this information could be even out of the storage file; for example, we 
> can just use a small "version indicator" file in the rocksdb directory for 
> this purposes. Thoughts? [~enothereska] [~jkreps]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1694) KIP-4: Command line and centralized operations

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1694:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0
  Description: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements
  (was: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements)

> KIP-4: Command line and centralized operations
> --
>
> Key: KAFKA-1694
> URL: https://issues.apache.org/jira/browse/KAFKA-1694
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Joe Stein
>Assignee: Grant Henke
>Priority: Critical
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-1694.patch, KAFKA-1694_2014-12-24_21:21:51.patch, 
> KAFKA-1694_2015-01-12_15:28:41.patch, KAFKA-1694_2015-01-12_18:54:48.patch, 
> KAFKA-1694_2015-01-13_19:30:11.patch, KAFKA-1694_2015-01-14_15:42:12.patch, 
> KAFKA-1694_2015-01-14_18:07:39.patch, KAFKA-1694_2015-03-12_13:04:37.patch, 
> KAFKA-1772_1802_1775_1774_v2.patch
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1206) allow Kafka to start from a resource negotiator system

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1206:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> allow Kafka to start from a resource negotiator system
> --
>
> Key: KAFKA-1206
> URL: https://issues.apache.org/jira/browse/KAFKA-1206
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>  Labels: mesos
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-1206_2014-01-16_00:40:30.patch
>
>
> We need a generic implementation to hold the property information for 
> brokers, producers and consumers.  We want the resource negotiator to store 
> this information however it wants and give it respond with a 
> java.util.Properties.  This can get used then in the Kafka.scala as 
> serverConfigs for the KafkaServerStartable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3203) Add UnknownCodecException and UnknownMagicByteException to error mapping

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3203:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Add UnknownCodecException and UnknownMagicByteException to error mapping
> 
>
> Key: KAFKA-3203
> URL: https://issues.apache.org/jira/browse/KAFKA-3203
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Grant Henke
> Fix For: 0.10.2.0
>
>
> Currently most of the exceptions to user have an error code. While 
> UnknownCodecException and UnknownMagicByteException can also be thrown to 
> client, broker does not have error mapping for them, so clients will only 
> receive UnknownServerException, which is vague.
> We should create those two exceptions in client package and add them to error 
> mapping.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3736) Add http metrics reporter

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3736:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Add http metrics reporter
> -
>
> Key: KAFKA-3736
> URL: https://issues.apache.org/jira/browse/KAFKA-3736
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Adrian Muraru
> Fix For: 0.10.2.0
>
>
> The current builtin JMX metrics reporter is pretty heavy in terms of load and 
> collection. A new http lightweight reporter is proposed to expose the metrics 
> via a local http port.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2787) Refactor gradle build

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-2787:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Refactor gradle build
> -
>
> Key: KAFKA-2787
> URL: https://issues.apache.org/jira/browse/KAFKA-2787
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Affects Versions: 0.8.2.2
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.2.0
>
>
> The build files are quite large with a lot of duplication and overlap. This 
> could lead to mistakes, reduce readability and functionality, and hinder 
> future changes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2758) Improve Offset Commit Behavior

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-2758:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Improve Offset Commit Behavior
> --
>
> Key: KAFKA-2758
> URL: https://issues.apache.org/jira/browse/KAFKA-2758
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
>  Labels: newbiee
> Fix For: 0.10.2.0
>
>
> There are two scenarios of offset committing that we can improve:
> 1) we can filter the partitions whose committed offset is equal to the 
> consumed offset, meaning there is no new consumed messages from this 
> partition and hence we do not need to include this partition in the commit 
> request.
> 2) we can make a commit request right after resetting to a fetch / consume 
> position either according to the reset policy (e.g. on consumer starting up, 
> or handling of out of range offset, etc), or through the {code} seek {code} 
> so that if the consumer fails right after these event, upon recovery it can 
> restarts from the reset position instead of resetting again: this can lead 
> to, for example, data loss if we use "largest" as reset policy while there 
> are new messages coming to the fetching partitions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3209) Support single message transforms in Kafka Connect

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3209:
---
 Priority: Major  (was: Critical)
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Support single message transforms in Kafka Connect
> --
>
> Key: KAFKA-3209
> URL: https://issues.apache.org/jira/browse/KAFKA-3209
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Neha Narkhede
>  Labels: needs-kip
> Fix For: 0.10.2.0
>
>
> Users should be able to perform light transformations on messages between a 
> connector and Kafka. This is needed because some transformations must be 
> performed before the data hits Kafka (e.g. filtering certain types of events 
> or PII filtering). It's also useful for very light, single-message 
> modifications that are easier to perform inline with the data import/export.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3297:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
> Fix For: 0.10.2.0
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3596:
---
Affects Version/s: (was: 0.10.1.0)
   0.10.0.0
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Kafka Streams: Window expiration needs to consider more than event time
> ---
>
> Key: KAFKA-3596
> URL: https://issues.apache.org/jira/browse/KAFKA-3596
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Henry Cai
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Currently in Kafka Streams, the way the windows are expired in RocksDB is 
> triggered by new event insertion.  When a window is created at T0 with 10 
> minutes retention, when we saw a new record coming with event timestamp T0 + 
> 10 +1, we will expire that window (remove it) out of RocksDB.
> In the real world, it's very easy to see event coming with future timestamp 
> (or out-of-order events coming with big time gaps between events), this way 
> of retiring a window based on one event's event timestamp is dangerous.  I 
> think at least we need to consider both the event's event time and 
> server/stream time elapse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3701) Expose KafkaStreams metrics in public API

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3701:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Expose KafkaStreams metrics in public API
> -
>
> Key: KAFKA-3701
> URL: https://issues.apache.org/jira/browse/KAFKA-3701
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> The Kafka clients expose their metrics registries through a `metrics` method 
> presenting an unmodifiable collection, but `KafkaStreams` does not expose its 
> registry. Currently, applications can access a StreamsMetrics instance via 
> the ProcessorContext within a Processor, but this limits flexibility.
> Having read-only access to a KafkaStreams.metrics() method would allow a 
> developer to define a health check for their application based on the metrics 
> that KafkaStreams is collecting. Or a developer might want to define a metric 
> in some other framework based on KafkaStreams' metrics.
> I am imagining that an application would build and register 
> KafkaStreams-based health checks after building a KafkaStreams instance but 
> before calling the start() method. Are metrics added to the registry at the 
> time a KafkaStreams instance is constructed, or only after calling the 
> start() method? If metrics are registered only after application startup, 
> then this approach may not be sufficient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2525) Update ConsumerPerformance.scala to report join group time (new consumer)

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-2525:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Update ConsumerPerformance.scala to report join group time (new consumer)
> -
>
> Key: KAFKA-2525
> URL: https://issues.apache.org/jira/browse/KAFKA-2525
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Geoff Anderson
>Assignee: Geoff Anderson
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> The patch for https://issues.apache.org/jira/browse/KAFKA-2489 adds a small 
> amount of logic to ConsumerPerformance.scala which triggers consumer offsets 
> topic creation/group join etc. before consumer performance is measured.
> In https://github.com/apache/kafka/pull/179, [~gwenshap] pointed out that for 
> detecting regressions, it may be useful with the new consumer to report group 
> join time in addition to the other stats.
> The only tricky bit here is that this "consumer preparation" time also 
> includes time required to create the consumer offsets topic, so we'd like to 
> trigger creation of consumer offsets topic and make sure it's complete before 
> moving on the the "join group" phase.
> The solution here may be as simple as calling 
> consumer.partitionsFor(consumerOffsetTopic) to trigger a metadata request 
> (and hence topic creation)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3714) Allow users greater access to register custom streams metrics

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3714:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Allow users greater access to register custom streams metrics
> -
>
> Key: KAFKA-3714
> URL: https://issues.apache.org/jira/browse/KAFKA-3714
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: api
> Fix For: 0.10.2.0
>
>
> Copying in some discussion that originally appeared in 
> https://github.com/apache/kafka/pull/1362#issuecomment-219064302
> Kafka Streams is largely a higher-level abstraction on top of producers and 
> consumers, and it seems sensible to match the KafkaStreams interface to that 
> of KafkaProducer and KafkaConsumer where possible. For producers and 
> consumers, the metric registry is internal and metrics are only exposed as an 
> unmodifiable map. This allows users to access client metric values for use in 
> application health checks, etc., but doesn't allow them to register new 
> metrics.
> That approach seems reasonable if we assume that a user interested in 
> defining custom metrics is already going to be using a separate metrics 
> library. In such a case, users will likely find it easier to define metrics 
> using whatever library they're familiar with rather than learning the API for 
> Kafka's Metrics class. Is this a reasonable assumption?
> If we want to expose the Metrics instance so that users can define arbitrary 
> metrics, I'd argue that there's need for documentation updates. In 
> particular, I find the notion of metric tags confusing. Tags can be defined 
> in a MetricConfig when the Metrics instance is constructed, 
> StreamsMetricsImpl is maintaining its own set of tags, and users can set tag 
> overrides.
> If a user were to get access to the Metrics instance, they would be missing 
> the tags defined in StreamsMetricsImpl. I'm imagining that users would want 
> their custom metrics to sit alongside the predefined metrics with the same 
> tags, and users shouldn't be expected to manage those additional tags 
> themselves.
> So, why are we allowing users to define their own metrics via the 
> StreamsMetrics interface in the first place? Is it that we'd like to be able 
> to provide a built-in latency metric, but the definition depends on the 
> details of the use case so there's no generic solution? That would be 
> sufficient motivation for this special case of addLatencySensor. If we want 
> to continue down that path and give users access to define a wider range of 
> custom metrics, I'd prefer to extend the StreamsMetrics interface so that 
> users can call methods on that object, automatically getting the tags 
> appropriate for that instance rather than interacting with the raw Metrics 
> instance.
> ---
> Guozhang had the following comments:
> 1) For the producer/consumer cases, all internal metrics are provided and 
> abstracted from users, and they just need to read the documentation to poll 
> whatever provided metrics that they are interested; and if they want to 
> define more metrics, they are likely to be outside the clients themselves and 
> they can use whatever methods they like, so Metrics do not need to be exposed 
> to users.
> 2) For streams, things are a bit different: users define the computational 
> logic, which becomes part of the "Streams Client" processing and may be of 
> interests to be monitored by user themselves; think of a customized processor 
> that sends an email to some address based on a condition, and users want to 
> monitor the average rate of emails sent. Hence it is worth considering 
> whether or not they should be able to access the Metrics instance to define 
> their own along side the pre-defined metrics provided by the library.
> 3) Now, since the Metrics class was not previously designed for public usage, 
> it is not designed to be very user-friendly for defining sensors, especially 
> the semantics differences between name / scope / tags. StreamsMetrics tries 
> to hide some of these semantics confusion from users, but it still expose 
> tags and hence is not perfect in doing so. We need to think of a better 
> approach so that: 1) user defined metrics will be "aligned" (i.e. with the 
> same name prefix within a single application, with similar scope hierarchy 
> definition, etc) with library provided metrics, 2) natural APIs to do so.
> I do not have concrete ideas about 3) above on top of my head, comments are 
> more than welcomed.
> ---
> I'm not sure that I agree that 1) and 2) are truly different situations. A 
> user might choose to send email messages within a bare 

[jira] [Updated] (KAFKA-1776) Re-factor out existing tools that have been implemented behind the CLI

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1776:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Re-factor out existing tools that have been implemented behind the CLI
> --
>
> Key: KAFKA-1776
> URL: https://issues.apache.org/jira/browse/KAFKA-1776
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Joe Stein
>Priority: Minor
> Fix For: 0.10.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-334) Some tests fail when building on a Windows box

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-334.
---
   Resolution: Won't Fix
Fix Version/s: (was: 0.10.1.0)

Resolving this since it's been open forever and it seems there are no plans to 
fix it, especially since we have changed build systems.

> Some tests fail when building on a Windows box
> --
>
> Key: KAFKA-334
> URL: https://issues.apache.org/jira/browse/KAFKA-334
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.7
> Environment: Windows 7 - reproduces under command shell, cygwin, and 
> MINGW32 (Git Bash)
>Reporter: Roman Garcia
>Priority: Minor
>  Labels: build-failure, test-fail
>
> Trying to create a ZIP distro from sources failed.
> On Win7. On cygwin, command shell and git bash.
> Tried with incubator-src download from ASF download page, as well as fresh 
> checkout from latest trunk (r1329547).
> Once I tried the same on a Linux box, everything was working ok.
> svn co http://svn.apache.org/repos/asf/incubator/kafka/trunk kafka-0.7.0
> ./sbt update (OK)
> ./sbt package (OK)
> ./sbt release-zip (FAIL)
> Tests failing:
> [error] Error running kafka.integration.LazyInitProducerTest: Test FAILED
> [error] Error running kafka.zk.ZKLoadBalanceTest: Test FAILED
> [error] Error running kafka.javaapi.producer.ProducerTest: Test FAILED
> [error] Error running kafka.producer.ProducerTest: Test FAILED
> [error] Error running test: One or more subtasks failed
> [error] Error running doc: Scaladoc generation failed
> Stacks:
> [error] Test Failed: testZKSendWithDeadBroker
> junit.framework.AssertionFailedError: Message set should have another message
> at junit.framework.Assert.fail(Assert.java:47)
> at junit.framework.Assert.assertTrue(Assert.java:20)
> at 
> kafka.javaapi.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:448)
> [error] Test Failed: testZKSendToNewTopic
> junit.framework.AssertionFailedError: Message set should have 1 message
> at junit.framework.Assert.fail(Assert.java:47)
> at junit.framework.Assert.assertTrue(Assert.java:20)
> at 
> kafka.javaapi.producer.ProducerTest.testZKSendToNewTopic(ProducerTest.scala:416)
> [error] Test Failed: testLoadBalance(kafka.zk.ZKLoadBalanceTest)
> junit.framework.AssertionFailedError: expected:<5> but was:<0>
> at junit.framework.Assert.fail(Assert.java:47)
> at junit.framework.Assert.failNotEquals(Assert.java:277)
> at junit.framework.Assert.assertEquals(Assert.java:64)
> at junit.framework.Assert.assertEquals(Assert.java:195)
> at junit.framework.Assert.assertEquals(Assert.java:201)
> at 
> kafka.zk.ZKLoadBalanceTest.checkSetEqual(ZKLoadBalanceTest.scala:121)
> at 
> kafka.zk.ZKLoadBalanceTest.testLoadBalance(ZKLoadBalanceTest.scala:89)
> [error] Test Failed: testPartitionedSendToNewTopic
> java.lang.AssertionError:
>   Unexpected method call send("test-topic1", 0, 
> ByteBufferMessageSet(MessageAndOffset(message(magic = 1, attributes = 0, crc 
> = 2326977762, payload = java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]),15), )):
> close(): expected: 1, actual: 0
> at 
> org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:45)
> at 
> org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:73)
> at 
> org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:92)
> at 
> kafka.producer.SyncProducer$$EnhancerByCGLIB$$4385e618.send()
> at 
> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114)
> at 
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
> at 
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at kafka.producer.ProducerPool.send(ProducerPool.scala:100)
> at kafka.producer.Producer.zkSend(Producer.scala:137)
> at kafka.producer.Producer.send(Producer.scala:99)
> at 
> kafka.producer.ProducerTest.testPartitionedSendToNewTopic(ProducerTest.scala:576)
> [error] Test Failed: testZKSendToNewTopic
> junit.framework.AssertionFailedError: Message set should have 1 message
> at junit.framework.Assert.fail(Assert.java:47)
> at junit.framework.Assert.assertTrue(Assert.java:20)
> at 
> kafka.producer.ProducerTest.testZKSendToNewTopic(ProducerTest.scala:429)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1332) Add functionality to the offsetsBeforeTime() API

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-1332.

   Resolution: Duplicate
 Assignee: Jason Gustafson
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

Resolving this as a duplicate of KAFKA-4148, which is for KIP-79.

> Add functionality to the offsetsBeforeTime() API
> 
>
> Key: KAFKA-1332
> URL: https://issues.apache.org/jira/browse/KAFKA-1332
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Neha Narkhede
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> Add functionality to the offsetsBeforeTime() API to load offsets 
> corresponding to a particular timestamp, including earliest and latest offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4005) Add per topic compression ratio in broker and consumer.

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4005:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Add per topic compression ratio in broker and consumer.
> ---
>
> Key: KAFKA-4005
> URL: https://issues.apache.org/jira/browse/KAFKA-4005
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.2.0
>
>
> Currently we only have compression ratio metric on the producer side. It 
> would be very useful to have that in the brokers and consumers for each topic 
> as well. 
> On the broker side, the compression ratio can be potentially depending on the 
> produce request or the final message written to the disk. It is probably more 
> useful to use the compression ratio of the messages we finally writes to 
> disk. If the messages are compressed in different compression type, we can 
> maintain the compression ratio for each type separately.
> The consumer side compression ratio would be for each topic and compression 
> type combination as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485826#comment-15485826
 ] 

Elias Levy commented on KAFKA-4153:
---

Why would the other value be undefined? Default it to zero.

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1935) Consumer should use a separate socket for Coordinator connection

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1935:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Consumer should use a separate socket for Coordinator connection
> 
>
> Key: KAFKA-1935
> URL: https://issues.apache.org/jira/browse/KAFKA-1935
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>  Labels: newbiee
> Fix For: 0.10.2.0
>
>
> KAFKA-1925 is just a quick-fix of this issue, we need to let consumer to be 
> able to create separate sockets for the same server for coordinator / broker 
> roles.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3370) Add options to auto.offset.reset to reset offsets upon initialization only

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3370:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Add options to auto.offset.reset to reset offsets upon initialization only
> --
>
> Key: KAFKA-3370
> URL: https://issues.apache.org/jira/browse/KAFKA-3370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Vahid Hashemian
> Fix For: 0.10.2.0
>
>
> Currently "auto.offset.reset" is applied in the following two cases:
> 1) upon starting the consumer for the first time (hence no committed offsets 
> before);
> 2) upon fetching offsets out-of-range.
> For scenarios where case 2) needs to be avoid (i.e. people need to be 
> notified upon offsets out-of-range rather than silently offset reset), 
> "auto.offset.reset" need to be set to "none". However for case 1) setting 
> "auto.offset.reset" to "none" will cause NoOffsetForPartitionException upon 
> polling. And in this case, seekToBeginning/seekToEnd is mistakenly applied 
> trying to set the offset at initialization, which are actually designed for 
> during the life time of the consumer (in rebalance callback, for example).
> The fix proposal is to add two more options to "auto.offset.reset", 
> "earliest-on-start", and "latest-on-start", whose semantics are "earliest" 
> and "latest" for case 1) only, and "none" for case 2).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3333) Client Partitioner - Round Robin

2016-09-12 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485820#comment-15485820
 ] 

Jason Gustafson commented on KAFKA-:


I think this change probably calls for a KIP since it's a change to the public 
API. Moving to 0.10.2.0 for now.

> Client Partitioner - Round Robin
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Stephen Powis
> Fix For: 0.10.2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key.  But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3333) Client Partitioner - Round Robin

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Client Partitioner - Round Robin
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Stephen Powis
> Fix For: 0.10.2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key.  But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1617) Move Metadata Cache to TopicManager and handling of Offset Request to LogManager

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1617:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Move Metadata Cache to TopicManager and handling of Offset Request to 
> LogManager
> 
>
> Key: KAFKA-1617
> URL: https://issues.apache.org/jira/browse/KAFKA-1617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
> Fix For: 0.10.2.0
>
>
> This is a follow-up of KAFKA-1583. In order to make Kafka APIs a pure 
> stateless layer that just forwards different requests to the corresponding 
> managers, there are still two tasks left:
> 1. Move the metadata cache at KafkaApis to a separate manager, maybe called 
> TopicManager, which will be responsible for a) handle topic metadata request, 
> b) handle topic metadata update request by talking to replica manager if 
> necessary.
> 2. Move the handling logic of offset request to the LogManager, which should 
> contain all the information necessary to handle this request.
> Finally, the KafkaApis class should be stateless, meaning no inner variables 
> and no start()/shutdown() functions needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-853) Allow OffsetFetchRequest to initialize offsets

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-853:
--
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Allow OffsetFetchRequest to initialize offsets
> --
>
> Key: KAFKA-853
> URL: https://issues.apache.org/jira/browse/KAFKA-853
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.1
>Reporter: David Arthur
>Assignee: Balaji Seshadri
> Fix For: 0.10.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be nice for the OffsetFetchRequest API to have the option to 
> initialize offsets instead of returning unknown_topic_or_partition. It could 
> mimic the Offsets API by adding the "time" field and then follow the same 
> code path on the server as the Offset API. 
> In this case, the response would need to a boolean to indicate if the 
> returned offset was initialized or fetched from ZK.
> This would simplify the client logic when dealing with new topics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-566) Add last modified time to the TopicMetadataRequest

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-566:
--
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Add last modified time to the TopicMetadataRequest
> --
>
> Key: KAFKA-566
> URL: https://issues.apache.org/jira/browse/KAFKA-566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.0
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
> Fix For: 0.10.2.0
>
>
> To support KAFKA-560 it would be nice to have a last modified time in the 
> TopicMetadataRequest. This would be the timestamp of the last append to the 
> log as taken from stat on the final log segment.
> Implementation would involve
> 1. Adding a new field to TopicMetadataRequest
> 2. Adding a method Log.lastModified: Long to get the last modified time from 
> a log
> This timestamp would, of course, be subject to error in the event that the 
> file was touched without modification, but I think that is actually okay 
> since it provides a manual way to avoid gc'ing a topic that you  know you 
> will want.
> It is debatable whether this should go in 0.8. It would be nice to add the 
> field to the metadata request, at least, as that change should be easy and 
> would avoid needing to bump the version in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3364) Centrallize doc generation

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3364:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Centrallize doc generation
> --
>
> Key: KAFKA-3364
> URL: https://issues.apache.org/jira/browse/KAFKA-3364
> Project: Kafka
>  Issue Type: Sub-task
>  Components: build
>Reporter: Grant Henke
> Fix For: 0.10.2.0
>
>
> Currently docs generation is scattered throughout the build file/process. 
> Centralizing doc generation into its own location/file would help make the 
> process more clear.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1895) Investigate moving deserialization and decompression out of KafkaConsumer

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1895:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Investigate moving deserialization and decompression out of KafkaConsumer
> -
>
> Key: KAFKA-1895
> URL: https://issues.apache.org/jira/browse/KAFKA-1895
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
> Fix For: 0.10.2.0
>
>
> The consumer implementation in KAFKA-1760 decompresses fetch responses and 
> deserializes them into ConsumerRecords which are then handed back as the 
> result of poll().
> There are several downsides to this:
> 1. It is impossible to scale serialization and decompression work beyond the 
> single thread running the KafkaConsumer.
> 2. The results can come back during the processing of other calls such as 
> commit() etc which can result in caching these records a little longer.
> An alternative would be to have ConsumerRecords wrap the actual compressed 
> serialized MemoryRecords chunks and do the deserialization during iteration. 
> This way you could scale this over a thread pool if needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3366) Find a way to auto-generate expected error codes

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3366:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Find a way to auto-generate expected error codes
> 
>
> Key: KAFKA-3366
> URL: https://issues.apache.org/jira/browse/KAFKA-3366
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
> Fix For: 0.10.2.0
>
>
> Currently we comment on the expected error codes in the client 
> Response/Request implementations. It would be nice to have this be a part of 
> the protocol documentation and auto generated in the docs. 
> The documentations should include the expected errors codes for a given 
> request and recommendations on how they should be handled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485759#comment-15485759
 ] 

Matthias J. Sax commented on KAFKA-4153:


IMHO {{JoinWindows.before(100).after(50)}} would not be a good choice -- the 
user could just call only {{JoinWindows.before(100)}} creating a window with 
undefined value for after. For me, the API should ensure, that defining and 
invalid window is not possible (instead of check for this condition and throw 
an exception later on).

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2901) Extend ListGroups and DescribeGroup APIs to cover offsets

2016-09-12 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485727#comment-15485727
 ] 

Jason Gustafson edited comment on KAFKA-2901 at 9/13/16 12:18 AM:
--

We changed the behavior of ListGroups in KAFKA-2720 to include all groups, 
including those using only offset storage. I think the rest of the behavior is 
covered by KAFKA-3853, which is in progress, so I'm going to resolve this issue 
as a duplicate.


was (Author: hachikuji):
We changed the behavior of ListGroups and DescribeGroups in KAFKA-2720 to 
include all groups, including those using only offset storage. I think the rest 
of the behavior is covered by KAFKA-3144, which already has a patch available, 
so I'm going to resolve this issue as a duplicate.

> Extend ListGroups and DescribeGroup APIs to cover offsets
> -
>
> Key: KAFKA-2901
> URL: https://issues.apache.org/jira/browse/KAFKA-2901
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Andy Coates
>Assignee: Jason Gustafson
> Fix For: 0.10.1.0
>
>
> The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 
> allow admin tools to get details of consumer groups now that this information 
> is not stored in ZK.
> The brokers also now store offset information for consumer groups. At 
> present, there is no API for admin tools to discover the groups that brokers 
> are storing offsets for.
> For example, if a consumer is using the new consumer api, is storing offsets 
> in Kafka under the groupId 'Bob', but is using manual partition assignment, 
> then the {{groupCache}} in the {{GroupMetadataManager}} will not contain any 
> information about the group 'Bob'. However, the {{offsetCache}} in the 
> {{GroupMetadataManager}} will contain information about 'Bob'.
> Currently the only way for admin tools to know the full set of groups being 
> managed by Kafka, i.e. those storing offsets in Kafka, those using Kafka for 
> balancing of consumer groups, and those using Kafka for both, is to consume 
> the offset topic.
> We need to extend the List/Describe groups API to allow admin tools to 
> discover 'Bob' and allow the partition offsets to be retrieved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2901) Extend ListGroups and DescribeGroup APIs to cover offsets

2016-09-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-2901.

Resolution: Duplicate
  Assignee: Jason Gustafson  (was: Neha Narkhede)

We changed the behavior of ListGroups and DescribeGroups in KAFKA-2720 to 
include all groups, including those using only offset storage. I think the rest 
of the behavior is covered by KAFKA-3144, which already has a patch available, 
so I'm going to resolve this issue as a duplicate.

> Extend ListGroups and DescribeGroup APIs to cover offsets
> -
>
> Key: KAFKA-2901
> URL: https://issues.apache.org/jira/browse/KAFKA-2901
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Andy Coates
>Assignee: Jason Gustafson
> Fix For: 0.10.1.0
>
>
> The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 
> allow admin tools to get details of consumer groups now that this information 
> is not stored in ZK.
> The brokers also now store offset information for consumer groups. At 
> present, there is no API for admin tools to discover the groups that brokers 
> are storing offsets for.
> For example, if a consumer is using the new consumer api, is storing offsets 
> in Kafka under the groupId 'Bob', but is using manual partition assignment, 
> then the {{groupCache}} in the {{GroupMetadataManager}} will not contain any 
> information about the group 'Bob'. However, the {{offsetCache}} in the 
> {{GroupMetadataManager}} will contain information about 'Bob'.
> Currently the only way for admin tools to know the full set of groups being 
> managed by Kafka, i.e. those storing offsets in Kafka, those using Kafka for 
> balancing of consumer groups, and those using Kafka for both, is to consume 
> the offset topic.
> We need to extend the List/Describe groups API to allow admin tools to 
> discover 'Bob' and allow the partition offsets to be retrieved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1464) Add a throttling option to the Kafka replication tool

2016-09-12 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-1464:
---
Reviewer: Jun Rao
  Status: Patch Available  (was: Open)

> Add a throttling option to the Kafka replication tool
> -
>
> Key: KAFKA-1464
> URL: https://issues.apache.org/jira/browse/KAFKA-1464
> Project: Kafka
>  Issue Type: New Feature
>  Components: replication
>Affects Versions: 0.8.0
>Reporter: mjuarez
>Assignee: Ben Stopford
>Priority: Minor
>  Labels: replication, replication-tools
> Fix For: 0.10.1.0
>
>
> When performing replication on new nodes of a Kafka cluster, the replication 
> process will use all available resources to replicate as fast as possible.  
> This causes performance issues (mostly disk IO and sometimes network 
> bandwidth) when doing this in a production environment, in which you're 
> trying to serve downstream applications, at the same time you're performing 
> maintenance on the Kafka cluster.
> An option to throttle the replication to a specific rate (in either MB/s or 
> activities/second) would help production systems to better handle maintenance 
> tasks while still serving downstream applications.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485700#comment-15485700
 ] 

Elias Levy commented on KAFKA-4153:
---

You could do that, but it would be non-obvious to someone reading the code what 
the semantics of that window should be.

I would prefer seeing {{before}} and {{after}} have static alternatives, so you 
could write {{JoinWindows.before(100).after(50)}} or similar.

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4154) Kafka Connect fails to shutdown if it has not completed startup

2016-09-12 Thread Shikhar Bhushan (JIRA)
Shikhar Bhushan created KAFKA-4154:
--

 Summary: Kafka Connect fails to shutdown if it has not completed 
startup
 Key: KAFKA-4154
 URL: https://issues.apache.org/jira/browse/KAFKA-4154
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Shikhar Bhushan
Assignee: Shikhar Bhushan
 Fix For: 0.10.1.0


To reproduce:
1. Start Kafka Connect in distributed mode without Kafka running 
{{./bin/connect-distributed.sh config/connect-distributed.properties}}
2. Ctrl+C fails to terminate the process

thread dump:
{noformat}
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):

"Thread-1" #13 prio=5 os_prio=31 tid=0x7fc29a18a800 nid=0x7007 waiting on 
condition [0x73129000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0007bd7d91d8> (a 
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.stop(DistributedHerder.java:357)
at org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71)
at 
org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93)

"SIGINT handler" #27 daemon prio=9 os_prio=31 tid=0x7fc29aa6a000 nid=0x560f 
in Object.wait() [0x71a61000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0007bd63db38> (a 
org.apache.kafka.connect.runtime.Connect$ShutdownHook)
at java.lang.Thread.join(Thread.java:1245)
- locked <0x0007bd63db38> (a 
org.apache.kafka.connect.runtime.Connect$ShutdownHook)
at java.lang.Thread.join(Thread.java:1319)
at 
java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
at 
java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
at java.lang.Shutdown.runHooks(Shutdown.java:123)
at java.lang.Shutdown.sequence(Shutdown.java:167)
at java.lang.Shutdown.exit(Shutdown.java:212)
- locked <0x0007b0244600> (a java.lang.Class for java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)

"kafka-producer-network-thread | producer-1" #15 daemon prio=5 os_prio=31 
tid=0x7fc29a0b7000 nid=0x7a03 runnable [0x72608000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0007bd7788d8> (a sun.nio.ch.Util$2)
- locked <0x0007bd7788e8> (a java.util.Collections$UnmodifiableSet)
- locked <0x0007bd77> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:470)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Thread.java:745)

"DistributedHerder" #14 prio=5 os_prio=31 tid=0x7fc29a11e000 nid=0x7803 
waiting on condition [0x72505000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:37)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:299)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1310)
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:131)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:86)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:115)
at 

[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485638#comment-15485638
 ] 

Matthias J. Sax commented on KAFKA-4153:


Well, you can just call {{JoinWindows.of(before).after(after)}} (or reverse). 
However, it might be good to add {{JoinWindows#of(long before, long after)}}.

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-12 Thread radai
Hi Jun,

Yes, youre right - right now the next select() call will return immediately
with the same set of keys as earlier (at least) as they were not previously
handled (no memory).
My assumption is that this happens under considerable load - something has
to be occupying all this memory. also, this happens in the context of
SocketServer.Processor.run():

while (isRunning) {
   configureNewConnections()
   processNewResponses()
   poll()   <-- HERE
   processCompletedReceives()
   processCompletedSends()
   processDisconnected()
}

even within poll(), things like finishConnection(), prepare(), and write()s
can still make progress under low memory conditions. and given the load,
there's probably progress to be made in processCompletedReceives(),
processCompletedSends() and processDisconnected().

if there's progress to be made in other things its likely that the next
call to poll() will not happen immediately and so the loop wont be that
tight. in order for this to devolve into true busy waiting you would need a
situation where no progress can be made on any in-progress requests and no
responses to send out ?

if my assumption does not hold then you are correct, and selector.poll(300)
currently hardcoded in SocketServer.Processor.poll() would need to be
replaced with something more complicated. my biggest point of concern
though is that the resulting code would be complicated and would couple
Selector to the memory pool very tightly. undey my current patch Selector
needs the memory pool only to pass to channels when they are built. this
would allow different memory pools relatively easily for things like
reserving memory for cross-broker replication and high-SLA connections. a
tighter coupling would make any such future modification hard.

On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao  wrote:

> Hi, Radai,
>
> Thanks for the reply. I still have a followup question on #2.
>
> My understanding is that in your proposal, selector will now first read the
> size of the Receive. If there is not enough memory, it has to turn off the
> READ interest bit for the corresponding KafkaChannel. Otherwise, subsequent
> selector.poll() call will always return immediately, adding unnecessary
> overhead. If you do that, the  Selector will need to know when to turn on
> the READ interest bit again. It may not be enough to do this check until
> the next poll call since the timeout used by poll() could be arbitrarily
> large. So, it seems that some kind of coordination between the Selector and
> the bufferpool is needed?
>
> Jun
>
> On Thu, Sep 8, 2016 at 7:02 PM, radai  wrote:
>
> > Hi Jun,
> >
> > 1. yes, it is my own personal opinion that people use queued.max.requests
> > as an indirect way to bound memory consumption. once a more direct memory
> > bound mechanism exists (and works) i dont think queued.max.requests woul
> > dbe required. having said that I was not planning on making any changes
> > w.r.t queued.max.requests support (so I was aiming to get to a situation
> > where both configs are supported) to allow gathering enough
> data/feedback.
> >
> > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a
> > NetworkReceive. multiple such read() calls may be required until a
> Receive
> > is produced already in the current code base. my pool implementation is
> > non-blocking so if there's no memory available the read() call will
> return
> > null. poll() would then move on to try and service other selection keys.
> > the pool will be checked for available memory again the next time the
> > SocketServer.run() loop gets to poll(). and so right now I dont
> communicate
> > memory becoming available to the selector - it will just go on to try and
> > make progress elsewhere and come back again. i never block it or send it
> to
> > sleep. I think for efficiency what could maybe be done is if there's not
> > enough memory to service a readable selection key we may want to skip all
> > other read-ready selection keys for that iteration of
> pollSelectionKeys().
> > that would require rather invasive changes around
> > Selector.pollSelectionKeys() that I'd rather avoid. also different
> > KafkaChannels may be backed by different memory pool (under some sort of
> > future QoS scheme?), which would complicate such an optimization further.
> >
> > 3. i added the pool interface and implementation under
> kafka.common.memory,
> > and the API is "thin" enough to be generally useful (currently its
> > non-blocking only, but a get(long maxWait) is definitely doable). having
> > said that, I'm not really familiar enough with the code to say
> >
> >
> >
> > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao  wrote:
> >
> > > Hi, Radi,
> > >
> > > Thanks for the update. At the high level, this looks promising. A few
> > > comments below.
> > >
> > > 1. If we can bound the requests by bytes, it seems that we don't need
> > > queued.max.requests
> > > any more? Could we 

[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485561#comment-15485561
 ] 

Elias Levy commented on KAFKA-4153:
---

As a side note, the API for {{JoinWindows}} has become mangled since 0.10.0.1.  
Now if you want to instantiate an asymetric {{JoinWindows}} you must first call 
{{JoinWindows.of(0)}} before you can call {{before}} or {{after}} to set to 
actual window boundaries.  The class should provide alternative static method 
for both {{before}} and {{after}} to instantiate the class without having to 
first call {{of}}.

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated KAFKA-4153:
--
Flags: Patch

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485524#comment-15485524
 ] 

ASF GitHub Bot commented on KAFKA-4153:
---

GitHub user eliaslevy opened a pull request:

https://github.com/apache/kafka/pull/1846

KAFKA-4153: Fix incorrect KStream-KStream join behavior with asymmetric 
time window

The contribution is my original work and I license the work to the project 
under the project's open source license.

@guozhangwang

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eliaslevy/kafka KAFKA-4153

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1846.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1846


commit a82897f5e4933698c95de1a77ae6ae6f4c721743
Author: Elias Levy 
Date:   2016-09-12T22:27:34Z

Swap before & after values for other KStreamKstreamJoin

commit 05721ca926321e298fbd3481c96db176dfec9716
Author: Elias Levy 
Date:   2016-09-12T22:28:37Z

Add tests for asymetric window stream-stream joins




> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1846: KAFKA-4153: Fix incorrect KStream-KStream join beh...

2016-09-12 Thread eliaslevy
GitHub user eliaslevy opened a pull request:

https://github.com/apache/kafka/pull/1846

KAFKA-4153: Fix incorrect KStream-KStream join behavior with asymmetric 
time window

The contribution is my original work and I license the work to the project 
under the project's open source license.

@guozhangwang

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eliaslevy/kafka KAFKA-4153

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1846.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1846


commit a82897f5e4933698c95de1a77ae6ae6f4c721743
Author: Elias Levy 
Date:   2016-09-12T22:27:34Z

Swap before & after values for other KStreamKstreamJoin

commit 05721ca926321e298fbd3481c96db176dfec9716
Author: Elias Levy 
Date:   2016-09-12T22:28:37Z

Add tests for asymetric window stream-stream joins




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4153:
-

 Summary: Incorrect KStream-KStream join behavior with asymmetric 
time window
 Key: KAFKA-4153
 URL: https://issues.apache.org/jira/browse/KAFKA-4153
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Elias Levy
Assignee: Guozhang Wang


Using Kafka 0.10.0.1, if joining records in two streams separated by some time, 
but only when records from one stream are newer than records from the other, 
i.e. doing:

{{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}

One would expect that the following would be equivalent:

{{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}

Alas, that this is not the case.  Instead, this generates the same output as 
the first example:

{{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}

The problem is that the 
[{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
 implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
{{after}} values when creates the {{KStreamKStreamJoin}} for the other stream, 
even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2063:
-
Assignee: Andrey Neporada  (was: Alexey Ozeritskiy)

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Andrey Neporada
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485357#comment-15485357
 ] 

Alexey Ozeritskiy edited comment on KAFKA-2063 at 9/12/16 9:37 PM:
---

The approah KAFKA-3979 is better for configurations with very big 
max.message.bytes.
For example we have the following config on cluster of 77 hosts
{code}
replica.fetch.max.bytes=13500
replica.socket.receive.buffer.bytes=13500
message.max.bytes=134217728
socket.request.max.bytes=13500
{code}

For KIP-74 it is always needed 77*13500 bytes of memory for process. 
For KAFKA-3979 it is only 77*100 bytes in average.

I think we should to do something with that. 


was (Author: aozeritsky):
The approah KAFKA-3979 is better for configurations with very big 
max.message.size.
For example we have the following config on cluster of 77 hosts
{code}
replica.fetch.max.bytes=13500
replica.socket.receive.buffer.bytes=13500
message.max.bytes=134217728
socket.request.max.bytes=13500
{code}

For KIP-74 it is always needed 77*13500 bytes of memory for process. 
For KAFKA-3979 it is only 77*100 bytes in average.

I think we should to do something with that. 

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Alexey Ozeritskiy
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485357#comment-15485357
 ] 

Alexey Ozeritskiy commented on KAFKA-2063:
--

The approah KAFKA-3979 is better for configurations with very big 
max.message.size.
For example we have the following config on cluster of 77 hosts
{code}
replica.fetch.max.bytes=13500
replica.socket.receive.buffer.bytes=13500
message.max.bytes=134217728
socket.request.max.bytes=13500
{code}

For KIP-74 it is always needed 77*13500 bytes of memory for process. 
For KAFKA-3979 it is only 77*100 bytes in average.

I think we should to do something with that. 

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Alexey Ozeritskiy
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy reassigned KAFKA-2063:


Assignee: Alexey Ozeritskiy  (was: Andrey Neporada)

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Alexey Ozeritskiy
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3909) Queryable state for Kafka Streams

2016-09-12 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3909:

Assignee: Damian Guy

> Queryable state for Kafka Streams
> -
>
> Key: KAFKA-3909
> URL: https://issues.apache.org/jira/browse/KAFKA-3909
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> This is an umbrella story for capturing changes to Kafka Streams to enable 
> Queryable state as described in KIP-67 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4126) No relevant log when the topic is non-existent

2016-09-12 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4126:
--

Assignee: Vahid Hashemian

> No relevant log when the topic is non-existent
> --
>
> Key: KAFKA-4126
> URL: https://issues.apache.org/jira/browse/KAFKA-4126
> Project: Kafka
>  Issue Type: Bug
>Reporter: Balázs Barnabás
>Assignee: Vahid Hashemian
>Priority: Minor
>
> When a producer sends a ProducerRecord into a Kafka topic that doesn't 
> existst, there is no relevant debug/error log that points out the error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-63: Unify store and downstream caching in streams

2016-09-12 Thread Eno Thereska
Forgot to include the stats:

There were 3 binding votes (Guozhang, Neha, Gwen) and 6 non-binding votes 
(Matthias, Bill, Michael, Jim, Damian, Eno).

Thanks to everyone that voted and participated in the discussions.

Eno

> On 12 Sep 2016, at 16:00, Eno Thereska  wrote:
> 
> KIP-63 has now been adopted, thank you!
> 
> Eno
> 
>> On 9 Sep 2016, at 00:21, Neha Narkhede  wrote:
>> 
>> +1 (binding)
>> 
>> On Thu, Sep 8, 2016 at 2:34 PM Gwen Shapira  wrote:
>> 
>>> +1 (binding)
>>> 
>>> Looks great and very detailed. Nice job, Eno :)
>>> 
>>> On Thu, Aug 25, 2016 at 3:57 AM, Eno Thereska 
>>> wrote:
>>> 
 Hi folks,
 
 We'd like to start the vote for KIP-63. At this point the Wiki addresses
 all previous questions and we believe the PoC is feature-complete.
 
 Thanks
 Eno
 
>>> 
>>> 
>>> 
>>> --
>>> *Gwen Shapira*
>>> Product Manager | Confluent
>>> 650.450.2760 | @gwenshap
>>> Follow us: Twitter  | blog
>>> 
>>> 
>> -- 
>> Thanks,
>> Neha
> 



Re: Kafka KIP meeting Sep 13 at 11:00am PST

2016-09-12 Thread Renu Tewari
Hi Ismael
Could you please add me to the invite.

rtew...@linkedin.com

regards
Renu


On Mon, Sep 12, 2016 at 8:39 AM, Ismael Juma  wrote:

> Hi, Everyone,
>
> We plan to have a Kafka KIP meeting this coming Tuesday at 11:00am PST. If
> you plan to attend but haven't received an invite, please let me know. The
> following is the tentative agenda.
>
> Agenda:
> KIP-54: Sticky Partition Assignment Strategy
> KIP-72: Allow Sizing Incoming Request Queue in Bytes
>
> Thanks,
> Ismael
>


[jira] [Created] (KAFKA-4152) Reduce severity level of metadata fetch failure logging for nonexistent topics

2016-09-12 Thread Andrew Olson (JIRA)
Andrew Olson created KAFKA-4152:
---

 Summary: Reduce severity level of metadata fetch failure logging 
for nonexistent topics
 Key: KAFKA-4152
 URL: https://issues.apache.org/jira/browse/KAFKA-4152
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Andrew Olson


If a consumer proactively subscribes to one or more topics that don't already 
exist, but are expected to exist in the near future, warnings are repeatedly 
logged by the NetworkClient throughout the consumer's lifetime until the topic 
eventually gets created like,

{noformat}
org.apache.kafka.clients.NetworkClient [WARN]
Error while fetching metadata with correlation id 1 : 
{MY.NEW.TOPIC=UNKNOWN_TOPIC_OR_PARTITION,
ANOTHER.NEW.TOPIC=UNKNOWN_TOPIC_OR_PARTITION}
{noformat}

The NetworkClient's warning logging code for metadata fetch failures is rather 
generic, but could potentially examine the reason to log at debug level for 
UNKNOWN_TOPIC_OR_PARTITION and warn for all others. As these warnings could be 
very valuable for troubleshooting in some situations a reasonable approach 
might be to remember the unknown topics that it has logged a warning for, and 
reduce the log level from warning to debug for future logging for the same 
topics for the same common cause of not (yet, presumably) existing, although 
that does introduce some undesirable complexity.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4121) Allow creating broker listeners from hostname

2016-09-12 Thread Joaquin Casares (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15484524#comment-15484524
 ] 

Joaquin Casares commented on KAFKA-4121:


Thanks for the information!

I ended up using this workaround in my docker-compose.yml:

{CODE}
links:
  - kafka:kafka
{CODE}

which allowed me to use:

{CODE}
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
{CODE}

Thanks again!

> Allow creating broker listeners from hostname
> -
>
> Key: KAFKA-4121
> URL: https://issues.apache.org/jira/browse/KAFKA-4121
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.10.0.1
>Reporter: Joaquin Casares
>Priority: Minor
>
> I'm attempting to start a docker container using the parameter:
> {CODE}
> advertised.listeners = PLAINTEXT://dockercomposeproject_kafka_1:9092
> {CODE}
> However, I get this error:
> {CODE}
> kafka_1  | java.lang.IllegalArgumentException: Error creating broker 
> listeners from 'PLAINTEXT://dockercomposeproject_kafka_1:9092': Unable to 
> parse PLAINTEXT://dockercomposeproject_kafka_1:9092 to a broker endpoint
> kafka_1  |at 
> kafka.server.KafkaConfig.validateUniquePortAndProtocol(KafkaConfig.scala:954)
> kafka_1  |at 
> kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:985)
> kafka_1  |at 
> kafka.server.KafkaConfig.(KafkaConfig.scala:927)
> kafka_1  |at 
> kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743)
> kafka_1  |at 
> kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740)
> kafka_1  |at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
> kafka_1  |at kafka.Kafka$.main(Kafka.scala:58)
> kafka_1  |at kafka.Kafka.main(Kafka.scala)
> {CODE}
> My workaround is to set my envar (for a wurstmeister/kafka-docker container, 
> which is unrelated to this ticket) to:
> {CODE}
> KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://`getent hosts 
> dockercomposeproject_kafka_1 | awk '{ print $1 }'`:9092
> {CODE}
> Which resolves to:
> {CODE}
> advertised.listeners = PLAINTEXT://172.18.0.4:9092
> {CODE}
> This advertised.listeners resolution allows my docker container to start as 
> expected.
> Could sending in hostnames, instead of strict IP addresses, be supported for 
> the advertised.listeners setting?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4121) Allow creating broker listeners from hostname

2016-09-12 Thread Joaquin Casares (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joaquin Casares resolved KAFKA-4121.

Resolution: Duplicate

> Allow creating broker listeners from hostname
> -
>
> Key: KAFKA-4121
> URL: https://issues.apache.org/jira/browse/KAFKA-4121
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.10.0.1
>Reporter: Joaquin Casares
>Priority: Minor
>
> I'm attempting to start a docker container using the parameter:
> {CODE}
> advertised.listeners = PLAINTEXT://dockercomposeproject_kafka_1:9092
> {CODE}
> However, I get this error:
> {CODE}
> kafka_1  | java.lang.IllegalArgumentException: Error creating broker 
> listeners from 'PLAINTEXT://dockercomposeproject_kafka_1:9092': Unable to 
> parse PLAINTEXT://dockercomposeproject_kafka_1:9092 to a broker endpoint
> kafka_1  |at 
> kafka.server.KafkaConfig.validateUniquePortAndProtocol(KafkaConfig.scala:954)
> kafka_1  |at 
> kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:985)
> kafka_1  |at 
> kafka.server.KafkaConfig.(KafkaConfig.scala:927)
> kafka_1  |at 
> kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743)
> kafka_1  |at 
> kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740)
> kafka_1  |at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
> kafka_1  |at kafka.Kafka$.main(Kafka.scala:58)
> kafka_1  |at kafka.Kafka.main(Kafka.scala)
> {CODE}
> My workaround is to set my envar (for a wurstmeister/kafka-docker container, 
> which is unrelated to this ticket) to:
> {CODE}
> KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://`getent hosts 
> dockercomposeproject_kafka_1 | awk '{ print $1 }'`:9092
> {CODE}
> Which resolves to:
> {CODE}
> advertised.listeners = PLAINTEXT://172.18.0.4:9092
> {CODE}
> This advertised.listeners resolution allows my docker container to start as 
> expected.
> Could sending in hostnames, instead of strict IP addresses, be supported for 
> the advertised.listeners setting?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Kafka KIP meeting Sep 13 at 11:00am PST

2016-09-12 Thread Ismael Juma
Hi, Everyone,

We plan to have a Kafka KIP meeting this coming Tuesday at 11:00am PST. If
you plan to attend but haven't received an invite, please let me know. The
following is the tentative agenda.

Agenda:
KIP-54: Sticky Partition Assignment Strategy
KIP-72: Allow Sizing Incoming Request Queue in Bytes

Thanks,
Ismael


[jira] [Updated] (KAFKA-4151) System tests for KIP-78

2016-09-12 Thread Sumit Arrawatia (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumit Arrawatia updated KAFKA-4151:
---
Issue Type: New Feature  (was: Bug)

> System tests for KIP-78 
> 
>
> Key: KAFKA-4151
> URL: https://issues.apache.org/jira/browse/KAFKA-4151
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Sumit Arrawatia
>Assignee: Sumit Arrawatia
>
> System tests for KIP-78. The actual implementation is tracked in KAFKA-4093. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4151) System tests for KIP-78

2016-09-12 Thread Sumit Arrawatia (JIRA)
Sumit Arrawatia created KAFKA-4151:
--

 Summary: System tests for KIP-78 
 Key: KAFKA-4151
 URL: https://issues.apache.org/jira/browse/KAFKA-4151
 Project: Kafka
  Issue Type: Bug
Reporter: Sumit Arrawatia
Assignee: Sumit Arrawatia


System tests for KIP-78. The actual implementation is tracked in KAFKA-4093. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-63: Unify store and downstream caching in streams

2016-09-12 Thread Eno Thereska
KIP-63 has now been adopted, thank you!

Eno

> On 9 Sep 2016, at 00:21, Neha Narkhede  wrote:
> 
> +1 (binding)
> 
> On Thu, Sep 8, 2016 at 2:34 PM Gwen Shapira  wrote:
> 
>> +1 (binding)
>> 
>> Looks great and very detailed. Nice job, Eno :)
>> 
>> On Thu, Aug 25, 2016 at 3:57 AM, Eno Thereska 
>> wrote:
>> 
>>> Hi folks,
>>> 
>>> We'd like to start the vote for KIP-63. At this point the Wiki addresses
>>> all previous questions and we believe the PoC is feature-complete.
>>> 
>>> Thanks
>>> Eno
>>> 
>> 
>> 
>> 
>> --
>> *Gwen Shapira*
>> Product Manager | Confluent
>> 650.450.2760 | @gwenshap
>> Follow us: Twitter  | blog
>> 
>> 
> -- 
> Thanks,
> Neha



[jira] [Updated] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2063:
---
Assignee: Andrey Neporada

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Andrey Neporada
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2063) Bound fetch response size

2016-09-12 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2063:
---
Fix Version/s: 0.10.1.0

> Bound fetch response size
> -
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15484311#comment-15484311
 ] 

Ismael Juma commented on KAFKA-2063:


PR link: https://github.com/apache/kafka/pull/1812

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2063:
---
Summary: Bound fetch response size (KIP-74)  (was: Bound fetch response 
size)

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2063) Bound fetch response size (KIP-74)

2016-09-12 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2063:
---
Status: Patch Available  (was: Open)

> Bound fetch response size (KIP-74)
> --
>
> Key: KAFKA-2063
> URL: https://issues.apache.org/jira/browse/KAFKA-2063
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
> Fix For: 0.10.1.0
>
>
> Currently the only bound on the fetch response size is 
> max.partition.fetch.bytes * num_partitions. There are two problems:
> 1. First this bound is often large. You may chose 
> max.partition.fetch.bytes=1MB to enable messages of up to 1MB. However if you 
> also need to consume 1k partitions this means you may receive a 1GB response 
> in the worst case!
> 2. The actual memory usage is unpredictable. Partition assignment changes, 
> and you only actually get the full fetch amount when you are behind and there 
> is a full chunk of data ready. This means an application that seems to work 
> fine will suddenly OOM when partitions shift or when the application falls 
> behind.
> We need to decouple the fetch response size from the number of partitions.
> The proposal for doing this would be to add a new field to the fetch request, 
> max_bytes which would control the maximum data bytes we would include in the 
> response.
> The implementation on the server side would grab data from each partition in 
> the fetch request until it hit this limit, then send back just the data for 
> the partitions that fit in the response. The implementation would need to 
> start from a random position in the list of topics included in the fetch 
> request to ensure that in a case of backlog we fairly balance between 
> partitions (to avoid first giving just the first partition until that is 
> exhausted, then the next partition, etc).
> This setting will make the max.partition.fetch.bytes field in the fetch 
> request much less useful and we  should discuss just getting rid of it.
> I believe this also solves the same thing we were trying to address in 
> KAFKA-598. The max_bytes setting now becomes the new limit that would need to 
> be compared to max_message size. This can be much larger--e.g. setting a 50MB 
> max_bytes setting would be okay, whereas now if you set 50MB you may need to 
> allocate 50MB*num_partitions.
> This will require evolving the fetch request protocol version to add the new 
> field and we should do a KIP for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4007) Improve fetch pipelining for low values of max.poll.records

2016-09-12 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15484105#comment-15484105
 ] 

Eno Thereska commented on KAFKA-4007:
-

[~hachikuji] isn't the point of max.poll.records to improve the fetch 
pipelining. In the sense that if users set it to low they should expect 
pipelining to degrade, while if they set it higher pipelining should improve. 
Why do we want to mask that expected behaviour?

> Improve fetch pipelining for low values of max.poll.records
> ---
>
> Key: KAFKA-4007
> URL: https://issues.apache.org/jira/browse/KAFKA-4007
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Mickael Maison
>
> Currently the consumer will only send a prefetch for a partition after all 
> the records from the previous fetch have been consumed. This can lead to 
> suboptimal pipelining when max.poll.records is set very low since the 
> processing latency for a small set of records may be small compared to the 
> latency of a fetch. An improvement suggested by [~junrao] is to send the 
> fetch anyway even if we have unprocessed data buffered, but delay reading it 
> from the socket until that data has been consumed. Potentially the consumer 
> can delay reading _any_ pending fetch until it is ready to be returned to the 
> user, which may help control memory better. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1845: HOTFIX: throw exception if the Cluster object has ...

2016-09-12 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/1845

HOTFIX: throw exception if the Cluster object has not been intialized in 
StreamMetadataState

During rebalance operations the Cluster object gets set to Cluster.empty(). 
This can result in NPEs when doing certain operation on StreamsMetadataState. 
This should throw a StreamsException if the Cluster is empty as it is not yet 
(re-)initialized

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka streams-meta-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1845.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1845


commit add6b5e52a0b5c0b35c1a4cd6e0a5a9c395e50b7
Author: Damian Guy 
Date:   2016-09-12T13:15:44Z

throw exception if the Cluster object has not been intialized




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1844: MINOR: Adjusted heartbeat and rebalancing interval...

2016-09-12 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1844

MINOR: Adjusted heartbeat and rebalancing intervals to stop triggering 
spuri…

…ous rebalances

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka 
minor-reenable-basic-smoke-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1844


commit a59d2b3e02434ba68ebc7bbb7cb42511c3e98ab0
Author: Eno Thereska 
Date:   2016-09-12T10:10:24Z

Adjusted heartbeat and rebalancing intervals to stop triggering spurious 
rebalances




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2016-09-12 Thread Bence Varga (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15483605#comment-15483605
 ] 

Bence Varga commented on KAFKA-1835:


I can see two actions that can resolve this issue:

A) Remove networking from the foreground thread. Neither the _constructor_ nor 
the call to send() should block. And definitely not when using default 
configuration.

B) Remove any reference to "async" and "non-blocking" behaviour form the API 
docs as it is very misleading.

Note: setting "metadata.fetch.timeout.ms" to zero will render the client 
unworkable.

> Kafka new producer needs options to make blocking behavior explicit
> ---
>
> Key: KAFKA-1835
> URL: https://issues.apache.org/jira/browse/KAFKA-1835
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.2.0, 0.9.0.0, 0.10.1.0
>Reporter: Paul Pearcy
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
> KAFKA-1835.patch
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The new (0.8.2 standalone) producer will block the first time it attempts to 
> retrieve metadata for a topic. This is not the desired behavior in some use 
> cases where async non-blocking guarantees are required and message loss is 
> acceptable in known cases. Also, most developers will assume an API that 
> returns a future is safe to call in a critical request path. 
> Discussing on the mailing list, the most viable option is to have the 
> following settings:
>  pre.initialize.topics=x,y,z
>  pre.initialize.timeout=x
>  
> This moves potential blocking to the init of the producer and outside of some 
> random request. The potential will still exist for blocking in a corner case 
> where connectivity with Kafka is lost and a topic not included in pre-init 
> has a message sent for the first time. 
> There is the question of what to do when initialization fails. There are a 
> couple of options that I'd like available:
> - Fail creation of the client 
> - Fail all sends until the meta is available 
> Open to input on how the above option should be expressed. 
> It is also worth noting more nuanced solutions exist that could work without 
> the extra settings, they just end up having extra complications and at the 
> end of the day not adding much value. For instance, the producer could accept 
> and queue messages(note: more complicated than I am making it sound due to 
> storing all accepted messages in pre-partitioned compact binary form), but 
> you're still going to be forced to choose to either start blocking or 
> dropping messages at some point. 
> I have some test cases I am going to port over to the Kafka producer 
> integration ones and start from there. My current impl is in scala, but 
> porting to Java shouldn't be a big deal (was using a promise to track init 
> status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4150) Facing Message Loss When trying rolling updates

2016-09-12 Thread Avanish Pandey (JIRA)
Avanish Pandey created KAFKA-4150:
-

 Summary: Facing Message Loss When trying rolling updates
 Key: KAFKA-4150
 URL: https://issues.apache.org/jira/browse/KAFKA-4150
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.1
Reporter: Avanish Pandey


Hi ,

I am trying to see the settings that I can keep for my kafka cluster.

The settings I'am trying are:
replication-factor = 3
min.insync.replicas = 2
num of brokers = 3

Scenario:
Bring broker1 down.
Bring it back up.
Bring broker 2 down.
Bring it back up.
Bring broker 3 down.
Bring it back up.

There is a delay of at least 1-2 minutes in between the brokers being stopped.

Other configs:
zookeeper.session.timeout.ms=1
num.partitions=6
min.insync.replicas=2
log.retention.ms=17280
message.max.bytes=100
log.flush.interval.ms=5000
log.flush.offset.checkpoint.interval.ms=5000
log.roll.hours=24
zookeeper.session.timeout.ms=1000
leader.imbalance.check.interval.seconds=60
unclean.leader.election.enable=false


What I am seeing is variable number of messages lost. Where as , when i don't 
bring kafka down, or just bring 1 broker down and back up, everything is fine.

Could you let me know if there is some setting which is wrong or needs 
correction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled

2016-09-12 Thread Michal Turek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15483414#comment-15483414
 ] 

Michal Turek commented on KAFKA-3450:
-

Dirty workaround for this issue is to build and periodically update list of 
topics that exist in Kafka to be used for white list filtering before calling 
Producer.send(). Producer API has no such method, but consumer provides 
Consumer.listTopics().

> Producer blocks on send to topic that doesn't exist if auto create is disabled
> --
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
>Reporter: Michal Turek
>Assignee: Jun Rao
>Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> NoSuchTopicTest.class.getSimpleName());
> properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); 
> // Default is 60 seconds
> try (Producer producer = new 
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> }
> }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Sending message (NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 0 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 1 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 2 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 3 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 4 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | 

Build failed in Jenkins: kafka-trunk-jdk7 #1534

2016-09-12 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: catch InvalidStateStoreException in 
QueryableStateIntegrationTest

--
[...truncated 5885 lines...]
kafka.log.FileMessageSetTest > testTruncate STARTED

kafka.log.FileMessageSetTest > testTruncate PASSED

kafka.log.FileMessageSetTest > testIterationOverPartialAndTruncation STARTED

kafka.log.FileMessageSetTest > testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest > testRead STARTED

kafka.log.FileMessageSetTest > testRead PASSED

kafka.log.FileMessageSetTest > 
testTruncateNotCalledIfSizeIsBiggerThanTargetSize STARTED

kafka.log.FileMessageSetTest > 
testTruncateNotCalledIfSizeIsBiggerThanTargetSize PASSED

kafka.log.FileMessageSetTest > testFileSize STARTED

kafka.log.FileMessageSetTest > testFileSize PASSED

kafka.log.FileMessageSetTest > testIteratorWithLimits STARTED

kafka.log.FileMessageSetTest > testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest > testWriteToChannelThatConsumesPartially STARTED

kafka.log.FileMessageSetTest > testWriteToChannelThatConsumesPartially PASSED

kafka.log.FileMessageSetTest > testTruncateNotCalledIfSizeIsSameAsTargetSize 
STARTED

kafka.log.FileMessageSetTest > testTruncateNotCalledIfSizeIsSameAsTargetSize 
PASSED

kafka.log.FileMessageSetTest > testPreallocateTrue STARTED

kafka.log.FileMessageSetTest > testPreallocateTrue PASSED

kafka.log.FileMessageSetTest > testIteratorIsConsistent STARTED

kafka.log.FileMessageSetTest > testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest > testTruncateIfSizeIsDifferentToTargetSize STARTED

kafka.log.FileMessageSetTest > testTruncateIfSizeIsDifferentToTargetSize PASSED

kafka.log.FileMessageSetTest > testFormatConversionWithPartialMessage STARTED

kafka.log.FileMessageSetTest > testFormatConversionWithPartialMessage PASSED

kafka.log.FileMessageSetTest > testIterationDoesntChangePosition STARTED

kafka.log.FileMessageSetTest > testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest > testWrittenEqualsRead STARTED

kafka.log.FileMessageSetTest > testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest > testWriteTo STARTED

kafka.log.FileMessageSetTest > testWriteTo PASSED

kafka.log.FileMessageSetTest > testPreallocateFalse STARTED

kafka.log.FileMessageSetTest > testPreallocateFalse PASSED

kafka.log.FileMessageSetTest > testPreallocateClearShutdown STARTED

kafka.log.FileMessageSetTest > testPreallocateClearShutdown PASSED

kafka.log.FileMessageSetTest > testMessageFormatConversion STARTED

kafka.log.FileMessageSetTest > testMessageFormatConversion PASSED

kafka.log.FileMessageSetTest > testSearch STARTED

kafka.log.FileMessageSetTest > testSearch PASSED

kafka.log.FileMessageSetTest > testSizeInBytes STARTED

kafka.log.FileMessageSetTest > testSizeInBytes PASSED

kafka.log.LogConfigTest > testFromPropsEmpty STARTED

kafka.log.LogConfigTest > testFromPropsEmpty PASSED

kafka.log.LogConfigTest > testKafkaConfigToProps STARTED

kafka.log.LogConfigTest > testKafkaConfigToProps PASSED

kafka.log.LogConfigTest > testFromPropsInvalid STARTED

kafka.log.LogConfigTest > testFromPropsInvalid PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] 

Re: Queryable state client read guarantees

2016-09-12 Thread Mikael Högqvist
Hi,

this helps, thanks!

Basically, after each read, I'll check if the key is still supposed to be
on the host. Doing the check after the read is necessary to handle the case
when a rebalance happens in between the metadata lookup and the store get.
When checking after the read, it may happen that a valid read becomes
invalid, but that doesn't affect correctness.

During a rebalance the service either responds not available or redirect.
After the rebalance is completed, the store responds with redirect. With a
REST API, this could mean either 404 or a 303, temporary redirect to the
current host.

Best,
Mikael

On Mon, Sep 12, 2016 at 5:42 AM Guozhang Wang  wrote:

> Hi Mikael,
>
> Just adding to Damian's comment above, that the IllegalStateStoreException
> here is thrown to indicate a "transient" state where the state store
> hosting this key is being migrated and hence not available, where users
> implementing the REST APIs on top of it, for example, can choose to handle
> it differently. For example, either return a sentinel value as "key not
> available" or return some error codes.
>
> Guozhang
>
>
> On Fri, Sep 9, 2016 at 9:40 AM, Damian Guy  wrote:
>
> > Hi Mikael,
> >
> > During rebalance both instances should throw IllegalStateStoreException
> > until the rebalance has completed. Once the rebalance has completed if
> the
> > key is not found on the local store, then you would get a null value. You
> > can always find the Kafka Streams instance that will have that key
> > (assuming it exists) by using:
> >
> > StreamsMetadata KafkaStreams.metadataForKey(String storeName, K key,
> > Serializer keySerializer)
> >
> > The StreamsMetadata will tell you which instance, via HostInfo, has the
> > given key.
> >
> > HTH,
> > Damian
> >
> >
> >
> >
> > On Fri, 9 Sep 2016 at 16:56 Mikael Högqvist  wrote:
> >
> > > Hi Damian,
> > >
> > > thanks for fixing this so quickly, I re-ran the test and it works fine.
> > >
> > > The next test I tried was to read from two service instances
> implementing
> > > the same string count topology. First, the client is started sending
> two
> > > read requests, one per instance, every second. Next, I start the first
> > > instance and let it complete the store init before the next instance is
> > > started.
> > >
> > > Below is the initial part of the trace when going from 0 to 1 instance.
> > The
> > > trace log has the following columns: request id, instance, response
> code
> > > and value.
> > >
> > > 3,localhost:2030,503,
> > > 3,localhost:2031,503,
> > > 4,localhost:2030,503,
> > > 4,localhost:2031,503,
> > > 5,localhost:2030,200,2
> > > 5,localhost:2031,503,
> > > 6,localhost:2030,200,2
> > > 6,localhost:2031,503,
> > >
> > > Before the instance is started, both return 503, the status returned by
> > the
> > > client when it cannot connect to an instance. When the first instance
> has
> > > started it returns the expected value 2 for request pair 5, 6 and so
> on.
> > > The trace below is from when the second instance starts.
> > >
> > > 18,localhost:2030,200,2
> > > 18,localhost:2031,503,
> > > 19,localhost:2030,404,
> > > 19,localhost:2031,503,
> > > 20,localhost:2030,404,
> > > 20,localhost:2031,503,
> > > 21,localhost:2030,404,
> > > 21,localhost:2031,200,2
> > > 22,localhost:2030,404,
> > > 22,localhost:2031,200,2
> > >
> > > The new instance takes over responsibility for the partition containing
> > the
> > > key "hello". During this period the new instance returns 503 as
> expected
> > > until the store is ready. The issue is that the first instance that
> > stored
> > > the value starts returning 404 from request pair 19. A client doing
> > > requests for this key would then have the following sequence:
> > >
> > > 18 -> 2
> > > 19 -> Not found
> > > 20 -> Not found
> > > 21 -> 2
> > >
> > > From the client perspective, I think this violates the guarantee of
> > always
> > > reading the latest value.
> > >
> > > Am I making the wrong assumptions or is there some way to detect that
> the
> > > local store is not responsible for the key anymore?
> > >
> > > Best,
> > > Mikael
> > >
> > > On Thu, Sep 8, 2016 at 11:03 AM Damian Guy 
> wrote:
> > >
> > > > Hi Mikael,
> > > >
> > > > A fix for KAFKA-4123 <
> https://issues.apache.org/jira/browse/KAFKA-4123
> > >
> > > > (the
> > > > issue you found with receiving null values) has now been committed to
> > > > trunk. I've tried it with your github repo and it appears to be
> > working.
> > > > You will have to make a small change to your code as we now throw
> > > > InvalidStateStoreException when the Stores are unavailable
> (previously
> > we
> > > > returned null).
> > > >
> > > > We added a test here
> > > > <
> > > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > test/java/org/apache/kafka/streams/integration/
> > QueryableStateIntegrationTest.java#L431
> > > > >
> > > > to
> > > > make sure we only get a value