[jira] [Commented] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2016-12-04 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15721224#comment-15721224
 ] 

Maysam Yabandeh commented on KAFKA-4039:


I am afraid I do not have much free cycles left to work on this jira. The task 
is up for grab.

> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.0
>
> Attachments: deadlock-stack2
>
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4486) Kafka Streams - exception in process still commits offsets

2016-12-04 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15721213#comment-15721213
 ] 

Guozhang Wang commented on KAFKA-4486:
--

[~joel.lundell] Thanks for reporting your use case, which exception were you 
trying to throw that always triggered `StreamThread#shutdownTaskAndStat`? 
Streams tries to distinguish retriable exception from fatal ones and upon 
seeing a fatal error we should not commit the offsets while closing the 
instance.

> Kafka Streams - exception in process still commits offsets
> --
>
> Key: KAFKA-4486
> URL: https://issues.apache.org/jira/browse/KAFKA-4486
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Java 8
>Reporter: Joel Lundell
>
> I'm building a streams application and would like to be able to control the 
> commits manually using ProcessorContext#commit() from an instance of 
> org.apache.kafka.streams.processor.Processor.
> My use case is that I want to read messages from a topic and push them to AWS 
> SQS and I need to be able to guarantee that all messages reach the queue at 
> least once. I also want to use SQS batching support so my approach at the 
> moment is that in Processor#process i'm saving X records in a data structure 
> and when I have a full batch I send it off and if successful i commit. If I 
> for any reason can't deliver the records I don't want the offsets being 
> committed so that when processing works again I can start processing from the 
> last successful record.
> When I was trying out the error handling I noticed that if I create a 
> Processor and in the process method always throw an exception that will 
> trigger StreamThread#shutdownTaskAndState which calls 
> StreamThread#commitOffsets and next time I run the application it starts as 
> if the previous "record" was successfully processed.
> Is there a way to achieve what I'm looking for?
> I found a similar discussion in 
> https://issues.apache.org/jira/browse/KAFKA-3491 but that issue is still open.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4434) KafkaProducer configuration is logged twice

2016-12-04 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15720878#comment-15720878
 ] 

huxi commented on KAFKA-4434:
-

already assigned to you.

> KafkaProducer configuration is logged twice
> ---
>
> Key: KAFKA-4434
> URL: https://issues.apache.org/jira/browse/KAFKA-4434
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 0.10.0.1
>Reporter: Ruben de Gooijer
>Assignee: kevin.chen
>Priority: Minor
>  Labels: newbie
> Fix For: 0.10.2.0
>
>
> The constructor of org.apache.kafka.clients.producer.KafkaProducer accepts a 
> ProducerConfig which when constructed logs the configuration: 
> https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L58
>  . 
> However, when the construction of KafkaProducer proceeds the provided 
> ProducerConfig is repurposed and another instance is created 
> https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L323
>  which triggers another log with the same contents (only the clientId can 
> differ in case its not supplied in the original config). 
> At first sight this seems like unintended behaviour to me. At least it caused 
> me to dive into it in order to verify if there weren't two producer instances 
> running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4434) KafkaProducer configuration is logged twice

2016-12-04 Thread huxi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxi updated KAFKA-4434:

Assignee: kevin.chen

> KafkaProducer configuration is logged twice
> ---
>
> Key: KAFKA-4434
> URL: https://issues.apache.org/jira/browse/KAFKA-4434
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 0.10.0.1
>Reporter: Ruben de Gooijer
>Assignee: kevin.chen
>Priority: Minor
>  Labels: newbie
> Fix For: 0.10.2.0
>
>
> The constructor of org.apache.kafka.clients.producer.KafkaProducer accepts a 
> ProducerConfig which when constructed logs the configuration: 
> https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L58
>  . 
> However, when the construction of KafkaProducer proceeds the provided 
> ProducerConfig is repurposed and another instance is created 
> https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L323
>  which triggers another log with the same contents (only the clientId can 
> differ in case its not supplied in the original config). 
> At first sight this seems like unintended behaviour to me. At least it caused 
> me to dive into it in order to verify if there weren't two producer instances 
> running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1729

2016-12-04 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4205; KafkaApis: fix NPE caused by conversion to array

--
[...truncated 14187 lines...]
org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigWithCompactForNonWindowStores STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigWithCompactForNonWindowStores PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSelfParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSelfParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSelfParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSelfParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testTopicGroups STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testTopicGroups PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testBuild STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testBuild PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent 

[jira] [Commented] (KAFKA-4451) Recovering empty replica yields negative offsets in index of compact partitions

2016-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15720715#comment-15720715
 ] 

ASF GitHub Bot commented on KAFKA-4451:
---

GitHub user michaelschiff opened a pull request:

https://github.com/apache/kafka/pull/2210

KAFKA-4451 

Fix OffsetIndex overflow when replicating a highly compacted topic.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelschiff/kafka bug/4451

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2210


commit d31ae87e47be6c00aca898c05fa5cf993ff127cc
Author: Michael Schiff 
Date:   2016-12-04T22:26:07Z

Adds a unit test case to LogTest that currently fails due to the
presence of KAFKA-4451




> Recovering empty replica yields negative offsets in index of compact 
> partitions
> ---
>
> Key: KAFKA-4451
> URL: https://issues.apache.org/jira/browse/KAFKA-4451
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Michael Schiff
>  Labels: reliability
>
> Bringing up an empty broker.  
> the partition for a compact topic is not split into multiple log files.  All 
> data is written into a single log file, causing offsets to overflow.
> A dump of the affected broker shortly after it started replicating:
> {code}
> michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index | head -n 10
> Dumping /kafka/attainment_event-0/.index
> offset: 1022071124 position: 1037612
> offset: -1713432120 position: 1348740
> offset: -886291423 position: 2397130
> offset: -644750126 position: 3445630
> offset: -57889876 position: 4493972
> offset: 433950099 position: 5388461
> offset: 1071769472 position: 6436837
> offset: 1746859069 position: 7485367
> offset: 2090359736 position: 8533822
> ...
> {code}
> and the dump of the same log file from the leader of this partition
> {code}
> michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/.index
> [sudo] password for michael.schiff:
> Dumping /kafka/attainment_event-0/.index
> offset: 353690666 position: 262054
> offset: 633140428 position: 523785
> offset: 756537951 position: 785815
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2210: KAFKA-4451

2016-12-04 Thread michaelschiff
GitHub user michaelschiff opened a pull request:

https://github.com/apache/kafka/pull/2210

KAFKA-4451 

Fix OffsetIndex overflow when replicating a highly compacted topic.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelschiff/kafka bug/4451

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2210


commit d31ae87e47be6c00aca898c05fa5cf993ff127cc
Author: Michael Schiff 
Date:   2016-12-04T22:26:07Z

Adds a unit test case to LogTest that currently fails due to the
presence of KAFKA-4451




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk8 #1078

2016-12-04 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4205; KafkaApis: fix NPE caused by conversion to array

--
[...truncated 14349 lines...]
org.apache.kafka.streams.kstream.TimeWindowsTest > 
windowsForBarelyOverlappingHoppingWindows PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > defaultSerdeShouldBeConfigured 
STARTED

org.apache.kafka.streams.StreamsConfigTest > defaultSerdeShouldBeConfigured 
PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportMultipleBootstrapServers STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportMultipleBootstrapServers PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfKeySerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfKeySerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetRestoreConsumerConfigs 
STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetRestoreConsumerConfigs 
PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-04 Thread Daniel Schierbeck
Hi Apurva,

Thanks for taking the time to reply. You make excellent points, and it
sounds like the right tradeoff. It would be great if the coordinator code
could be shared with the consumer API, hope that's actually the case.

Daniel Schierbeck

On Thu, 1 Dec 2016 at 18.34 Apurva Mehta  wrote:

> Hi Daniel,
>
> That is a very good point. You are correct in saying that one does not need
> a transaction coordinator to get idempotent semantics.
>
> There are, however, three reasons why we chose this route:
>
> 1. The request to find a transaction coordinator is exactly the same as
> the request consumers use to find the group coordinator. So if clients
> already implement the new consumer, you should already have the code you
> need to find the transaction coordinator. I would even so far as to say
> that the majority coordinator discovery code can be effectively shared
> between producers and consumers. Jason should correct me on this, however,
> since he is most familiar with that bit.
> 2. With this route, the broker side changes are simpler. In particular,
> we have to implement the InitPIDRequest only in the coordinator.
> 3. By always having a transaction coordinator, we can enable
> applications to use transactions even if they don't specify the AppId. The
> only thing you lose is transaction recovery across sessions.
>
> Needless to say, we did debate this point extensively. What swung our
> decision ultimately was the following observation: if the user does not
> provide a transaction.app.id, the client can generate a UUID and use that
> as the appId for the rest of the session. This means that there are no
> branches in the client and server code, and is overall simpler to maintain.
> All the producer APIs are also available to the user and it would be more
> intuitive.
>
> It also means that clients cannot choose idempotence without transactions,
> and hence it does place a greater burden on implementors of kafka clients.
> But the cost should be minimal given point 1 above, and was deemed worth
> it.
>
> Thanks once more for your thoughtful comments. It would be great for other
> client implementors to chime in on this.
>
> Regards,
> Apurva
>
>
>
> On Thu, Dec 1, 2016 at 3:16 AM, Daniel Schierbeck
>  > wrote:
>
> > Hi there,
> >
> > I'm the author of ruby-kafka, and as such am slightly biased towards
> > simplicity of implementation :-)
> >
> > I like the proposal, and would love to use idempotent producer semantics
> in
> > our projects at Zendesk, but I'm a bit worried about the complexity that
> > would go into the clients; specifically: it sounds to me that in order to
> > get idempotent producer semantics, I'd have to implement the transaction
> > coordinator discovery. I may be wrong, but it would seem that it's not
> > strictly necessary if you're not using transactions – we could just use
> the
> > topic partition's leader as the coordinator, avoiding the extra
> discovery.
> > In my experience, most bugs are related to figuring out which broker is
> the
> > leader of which partition/group/whatever, so minimizing the number of
> > moving parts would be beneficial to me. I'd also like to point out that I
> > would be reluctant to implement the transaction API in the near future,
> but
> > would love to implement the idempotency API soon. The former seems only
> > relevant to real stream processing frameworks, which is probably not the
> > best use case for ruby-kafka.
> >
> > Cheers,
> > Daniel Schierbeck
> >
> > On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson 
> wrote:
> >
> > > Hey Neha,
> > >
> > > Thanks for the thoughtful questions. I'll try to address the first
> > question
> > > since Apurva addressed the second. Since most readers are probably
> > getting
> > > up to speed with this large proposal, let me first take a step back and
> > > explain why we need the AppID at all. As Confluent tradition demands, I
> > > present you a big wall of text:
> > >
> > > Clearly "exactly once" delivery requires resilience to client failures.
> > > When a client crashes or turns into a zombie, another client must
> > > eventually be started to resume the work. There are two problems: 1) we
> > > need to ensure that the old process is actually dead or at least that
> it
> > > cannot write any more data, and 2) we need to be able to pick up
> wherever
> > > the last process left off. To do either of these, we need some kind of
> > > identifier to tie the two instances together.
> > >
> > > There are only two choices for where this ID comes from: either the
> user
> > > gives it to us or the server generates it. In the latter case, the user
> > is
> > > responsible for fetching it from the client and persisting it somewhere
> > for
> > > use after failure. We ultimately felt that the most flexible option is
> to
> > > have the user give it to us. In many applications, there is already a
> > > natural identifier which is already 

Jenkins build is back to normal : kafka-0.10.1-jdk7 #99

2016-12-04 Thread Apache Jenkins Server
See 



[jira] [Updated] (KAFKA-4205) NullPointerException in fetchOffsetsBefore

2016-12-04 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4205:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> NullPointerException in fetchOffsetsBefore
> --
>
> Key: KAFKA-4205
> URL: https://issues.apache.org/jira/browse/KAFKA-4205
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andrew Grasso
>Assignee: Anton Karamanov
>  Labels: reliability
> Fix For: 0.10.1.1
>
>
> We recently observed the following error in brokers running 0.9.0.1:
> A client saw an Unkown error code in response to an offset request for 
> TOPICX, partition 0
> The server logs look like:
> {code}
> [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log 
> TOPICX-0 for deletion. (kafka.log.Log)
> [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to 
> offset request (kafka.server.KafkaApis)
> java.lang.NullPointerException
> at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513)
> at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. 
> (kafka.log.Log)
> [2016-09-21 21:27:07,263] INFO Deleting index 
> /path/to/kafka/data/TOPICX-0/000527235760.index.deleted 
> (kafka.log.OffsetIndex)
> {code}
> I suspect a race condition between {{Log.deleteSegment}} (which takes a lock 
> on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any 
> lock. In particular, line 513 in KafkaApis looks like:
> {code:title=KafkaApis.scala|borderStyle=solid}
> 510  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: 
> Int): Seq[Long] = {
> 511val segsArray = log.logSegments.toArray
> 512var offsetTimeArray: Array[(Long, Long)] = null
> 513val lastSegmentHasSize = segsArray.last.size > 0;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4205) NullPointerException in fetchOffsetsBefore

2016-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15720401#comment-15720401
 ] 

ASF GitHub Bot commented on KAFKA-4205:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2204


> NullPointerException in fetchOffsetsBefore
> --
>
> Key: KAFKA-4205
> URL: https://issues.apache.org/jira/browse/KAFKA-4205
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andrew Grasso
>Assignee: Anton Karamanov
>  Labels: reliability
> Fix For: 0.10.1.1
>
>
> We recently observed the following error in brokers running 0.9.0.1:
> A client saw an Unkown error code in response to an offset request for 
> TOPICX, partition 0
> The server logs look like:
> {code}
> [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log 
> TOPICX-0 for deletion. (kafka.log.Log)
> [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to 
> offset request (kafka.server.KafkaApis)
> java.lang.NullPointerException
> at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513)
> at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461)
> at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. 
> (kafka.log.Log)
> [2016-09-21 21:27:07,263] INFO Deleting index 
> /path/to/kafka/data/TOPICX-0/000527235760.index.deleted 
> (kafka.log.OffsetIndex)
> {code}
> I suspect a race condition between {{Log.deleteSegment}} (which takes a lock 
> on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any 
> lock. In particular, line 513 in KafkaApis looks like:
> {code:title=KafkaApis.scala|borderStyle=solid}
> 510  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: 
> Int): Seq[Long] = {
> 511val segsArray = log.logSegments.toArray
> 512var offsetTimeArray: Array[(Long, Long)] = null
> 513val lastSegmentHasSize = segsArray.last.size > 0;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2204: KAFKA-4205; KafkaApis: fix NPE caused by conversio...

2016-12-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2204


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Suppressing redundant KTable forwards

2016-12-04 Thread Damian Guy
Hi Mathieu,

You are correct in that the de-duping only occurs within the commit
interval.

I can understand and appreciate the use-case you have. So I think the right
approach for this is to create a KIP with your suggested changes and put it
to the community. Are you happy to do that?

Thanks,
Damian

On Sun, 4 Dec 2016 at 15:20 Mathieu Fenniak 
wrote:

> Hi Eno,
>
> I'm not sure.  My understanding is that the cache would prevent two
> immediate updates for the same key from being forwarded, but that only
> applies when records arrive within commit.interval.ms of each other.  Is
> that understanding correct?
>
> filterRedundant compares the newValue & oldValue in a Change to work
> regardless of the time between records.
>
> https://github.com/apache/kafka/compare/trunk...mfenniak:filter-redundant
>
>
> The use-case that is currently kicking me is a piece of source data that
> contains multiple unrelated configuration fields in a single record; it's
> not a great design, but it's the data I have to work with.  I'm plucking
> out only a single relevant field with mapValues, but changes to the other
> fields within the record are causing excessive, expensive recomputations
> that are redundant.
>
> Mathieu
>
>
> On Sun, Dec 4, 2016 at 4:34 AM, Eno Thereska 
> wrote:
>
> > Hi Mathieu,
> >
> > Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do
> some
> > of this for you, in that it dedups records with the same key and prevents
> > them from being forwarded downstream?
> >
> > Eno
> > > On 4 Dec 2016, at 04:13, Mathieu Fenniak  >
> > wrote:
> > >
> > > Hey all,
> > >
> > > I'd like to contribute a new KTable API that would allow for the
> > > suppression of redundant KTable forwards, and I'd like to solicit
> > feedback
> > > before I put together a patch.
> > >
> > > A typical use-case of this API would be that you're using mapValues to
> > > pluck a subset of data out of a topic, but you'd like changes to the
> > record
> > > value that don't modify the output of mapValues to not cause output
> that
> > > trigger expensive and redundant recalculations.
> > >
> > > For example, topic "user" contains key:1, value:{"firstName": "Jack",
> > > "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> > > user.get("lastName"))  will create a KTable that would forward updates
> > from
> > > the user topic even if lastName never changed.
> > >
> > > My proposed API would be to add a filterRedundant method to KTable; one
> > > override takes a Comparator to provide a custom comparison for
> > > evaluating whether a change is redundant, and one parameterless
> override
> > > would use a comparator backed by the object's equals() method.
> > >
> > >/**
> > > * Creates a new instance of {@link KTable} that filters out
> redundant
> > > updates and prevents "non-updates" from
> > > * propagating to further operations on the returned table.  A
> > > redundant update onewhere the same value is provided
> > > * more than once for a given key.  Object.equals is used to compare
> > > whether a subsequent update has the same value.
> > >
> > > * @return a {@link KTable} that contains the same values as this
> > > table, but suppresses redundant updates
> > > */
> > >KTable filterRedundant();
> > >
> > >/**
> > > * Creates a new instance of {@link KTable} that filters out
> redundant
> > > updates and prevents "non-updates" from
> > > * propagating to further operations on the returned table.  A
> > > redundant update onewhere the same value is provided
> > > * more than once for a given key.  A user-provided comparator is
> used
> > > to compare whether a subsequent update has
> > > * the same value.
> > >
> > > * @return a {@link KTable} that contains the same values as this
> > > table, but suppresses redundant updates
> > > */
> > >KTable filterRedundant(Comparator comparator);
> >
> >
>


Re: Suppressing redundant KTable forwards

2016-12-04 Thread Mathieu Fenniak
Hi Eno,

I'm not sure.  My understanding is that the cache would prevent two
immediate updates for the same key from being forwarded, but that only
applies when records arrive within commit.interval.ms of each other.  Is
that understanding correct?

filterRedundant compares the newValue & oldValue in a Change to work
regardless of the time between records.

https://github.com/apache/kafka/compare/trunk...mfenniak:filter-redundant


The use-case that is currently kicking me is a piece of source data that
contains multiple unrelated configuration fields in a single record; it's
not a great design, but it's the data I have to work with.  I'm plucking
out only a single relevant field with mapValues, but changes to the other
fields within the record are causing excessive, expensive recomputations
that are redundant.

Mathieu


On Sun, Dec 4, 2016 at 4:34 AM, Eno Thereska  wrote:

> Hi Mathieu,
>
> Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do some
> of this for you, in that it dedups records with the same key and prevents
> them from being forwarded downstream?
>
> Eno
> > On 4 Dec 2016, at 04:13, Mathieu Fenniak 
> wrote:
> >
> > Hey all,
> >
> > I'd like to contribute a new KTable API that would allow for the
> > suppression of redundant KTable forwards, and I'd like to solicit
> feedback
> > before I put together a patch.
> >
> > A typical use-case of this API would be that you're using mapValues to
> > pluck a subset of data out of a topic, but you'd like changes to the
> record
> > value that don't modify the output of mapValues to not cause output that
> > trigger expensive and redundant recalculations.
> >
> > For example, topic "user" contains key:1, value:{"firstName": "Jack",
> > "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> > user.get("lastName"))  will create a KTable that would forward updates
> from
> > the user topic even if lastName never changed.
> >
> > My proposed API would be to add a filterRedundant method to KTable; one
> > override takes a Comparator to provide a custom comparison for
> > evaluating whether a change is redundant, and one parameterless override
> > would use a comparator backed by the object's equals() method.
> >
> >/**
> > * Creates a new instance of {@link KTable} that filters out redundant
> > updates and prevents "non-updates" from
> > * propagating to further operations on the returned table.  A
> > redundant update onewhere the same value is provided
> > * more than once for a given key.  Object.equals is used to compare
> > whether a subsequent update has the same value.
> >
> > * @return a {@link KTable} that contains the same values as this
> > table, but suppresses redundant updates
> > */
> >KTable filterRedundant();
> >
> >/**
> > * Creates a new instance of {@link KTable} that filters out redundant
> > updates and prevents "non-updates" from
> > * propagating to further operations on the returned table.  A
> > redundant update onewhere the same value is provided
> > * more than once for a given key.  A user-provided comparator is used
> > to compare whether a subsequent update has
> > * the same value.
> >
> > * @return a {@link KTable} that contains the same values as this
> > table, but suppresses redundant updates
> > */
> >KTable filterRedundant(Comparator comparator);
>
>


Re: Suppressing redundant KTable forwards

2016-12-04 Thread Eno Thereska
Hi Mathieu,

Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do some of 
this for you, in that it dedups records with the same key and prevents them 
from being forwarded downstream?

Eno
> On 4 Dec 2016, at 04:13, Mathieu Fenniak  wrote:
> 
> Hey all,
> 
> I'd like to contribute a new KTable API that would allow for the
> suppression of redundant KTable forwards, and I'd like to solicit feedback
> before I put together a patch.
> 
> A typical use-case of this API would be that you're using mapValues to
> pluck a subset of data out of a topic, but you'd like changes to the record
> value that don't modify the output of mapValues to not cause output that
> trigger expensive and redundant recalculations.
> 
> For example, topic "user" contains key:1, value:{"firstName": "Jack",
> "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> user.get("lastName"))  will create a KTable that would forward updates from
> the user topic even if lastName never changed.
> 
> My proposed API would be to add a filterRedundant method to KTable; one
> override takes a Comparator to provide a custom comparison for
> evaluating whether a change is redundant, and one parameterless override
> would use a comparator backed by the object's equals() method.
> 
>/**
> * Creates a new instance of {@link KTable} that filters out redundant
> updates and prevents "non-updates" from
> * propagating to further operations on the returned table.  A
> redundant update onewhere the same value is provided
> * more than once for a given key.  Object.equals is used to compare
> whether a subsequent update has the same value.
> 
> * @return a {@link KTable} that contains the same values as this
> table, but suppresses redundant updates
> */
>KTable filterRedundant();
> 
>/**
> * Creates a new instance of {@link KTable} that filters out redundant
> updates and prevents "non-updates" from
> * propagating to further operations on the returned table.  A
> redundant update onewhere the same value is provided
> * more than once for a given key.  A user-provided comparator is used
> to compare whether a subsequent update has
> * the same value.
> 
> * @return a {@link KTable} that contains the same values as this
> table, but suppresses redundant updates
> */
>KTable filterRedundant(Comparator comparator);



Re: [DISCUSS] KIP-95: Incremental Batch Processing for Kafka Streams

2016-12-04 Thread Eno Thereska
A couple of remaining questions:

- it says in the KIP: "because the metadata topic must be consumed by all 
instances, we need to assign the topic’s partitions manually and do not commit 
offsets -- we also need to seekToBeginning() each time we consume the metadata 
topic)" . How big of a change is allowing a topic to be consumed by all 
instances?

- for the case when we have a single instance, with intermediate topics etc, 
could we keep the data that we want to persist in the metadata topic in memory 
instead? What is the advantage of persisting this data? I understand the need 
if we have an application with multiple instances on different servers (but I 
still don't think we need to handle that case). Is there a need to persist data 
for a single instance? It would help if we enumerate the exact failure 
scenarios and how persisting the data helps. So I think you convinced me that 
this metadata is useful in the previous email, now I'm asking if it needs 
persisting.

I'm really trying to avoid having the metadata topic. It's one more topic that 
needs to be kept around and maintained carefully with all failure cases 
considered. With EoS around the corner introducing its own internal topic(s), 
and atomicity when writing to multiple topics, in my mind there is real value 
if we can have a solution without an extra topic for now.


Thanks
Eno



> On 28 Nov 2016, at 18:47, Matthias J. Sax  wrote:
> 
> Hi all,
> 
> I want to start a discussion about KIP-95:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> 
> Looking forward to your feedback.
> 
> 
> -Matthias
> 
>