[GitHub] kafka pull request #3118: MINOR: Broker should disallow downconversion of tr...

2017-05-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3118


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : kafka-0.10.2-jdk7 #167

2017-05-22 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #1574

2017-05-22 Thread Apache Jenkins Server
See 


Changes:

[me] HOTFIX: Replace JDK download and fix missing argument in Vagrant

--
[...truncated 2.46 MB...]
org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryWithoutPasswordConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryWithoutPasswordConfiguration PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration PASSED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse STARTED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameOverride STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameOverride PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingOptionValue 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingOptionValue PASSED

org.apache.kafka.common.security.JaasContextTest > testSingleOption STARTED

org.apache.kafka.common.security.JaasContextTest > testSingleOption PASSED

org.apache.kafka.common.security.JaasContextTest > 
testNumericOptionWithoutQuotes STARTED

org.apache.kafka.common.security.JaasContextTest > 
testNumericOptionWithoutQuotes PASSED

org.apache.kafka.common.security.JaasContextTest > testConfigNoOptions STARTED

org.apache.kafka.common.security.JaasContextTest > testConfigNoOptions PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithWrongListenerName STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithWrongListenerName PASSED

org.apache.kafka.common.security.JaasContextTest > testNumericOptionWithQuotes 
STARTED

org.apache.kafka.common.security.JaasContextTest > testNumericOptionWithQuotes 
PASSED

org.apache.kafka.common.security.JaasContextTest > testQuotedOptionValue STARTED

org.apache.kafka.common.security.JaasContextTest > testQuotedOptionValue PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingLoginModule 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingLoginModule PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingSemicolon STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingSemicolon PASSED

org.apache.kafka.common.security.JaasContextTest > testMultipleOptions STARTED

org.apache.kafka.common.security.JaasContextTest > testMultipleOptions PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForClientWithListenerName STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForClientWithListenerName PASSED

org.apache.kafka.common.security.JaasContextTest > testMultipleLoginModules 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMultipleLoginModules 
PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingControlFlag 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingControlFlag PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameAndFallback STARTED


[jira] [Commented] (KAFKA-5296) Unable to write to some partitions of newly created topic in 10.2

2017-05-22 Thread Abhisek Saikia (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020558#comment-16020558
 ] 

Abhisek Saikia commented on KAFKA-5296:
---

[~huxi_2b] Currently we have a really large value for the time out ` 30 sec as 
you can see from the logs.  When I looked into broker logs I could see brokers 
are receiving LeaderAndIsr request from multiple controllers . Is this the 
reason of the issue ? How can we fix this?

> Unable to write to some partitions of newly created topic in 10.2
> -
>
> Key: KAFKA-5296
> URL: https://issues.apache.org/jira/browse/KAFKA-5296
> Project: Kafka
>  Issue Type: Bug
>Reporter: Abhisek Saikia
>
> We are using kafka 10.2 and the cluster was running fine for a month with 50 
> topics and now we are having issue in producing message by creating new 
> topics. The create topic command is successful but producers are throwing 
> error while writing to some partitions. 
> Error in producer-
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> [topic1]-8: 30039 ms has passed since batch creation plus linger time
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
>  ~[kafka-clients-0.10.2.0.jar:na]
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
>  ~[kafka-clients-0.10.2.0.jar:na]
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>  ~[kafka-clients-0.10.2.0.jar:na]
> On the broker side, I don't see any topic-parition folder getting created for 
> the broker who is the leader for the partition. 
> While using 0.8 client, the write used to hang while it starts writing to the 
> partition having this issue. With 10.2 it resolved the the producer hang issue
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5309) Stores not queryable after one thread died

2017-05-22 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5309:
--

 Summary: Stores not queryable after one thread died
 Key: KAFKA-5309
 URL: https://issues.apache.org/jira/browse/KAFKA-5309
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.1
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax


For a multi threaded Streams instance, it can happen that one thread dies while 
all other threads (within a single instance) keep running. Even if this is an 
rare scenario, we should allow querying the stores after rebalance finished. 
However, this does never happen, as the died thread's state is still in 
{{KafkaStreams}} thread state map (as {{NOT_RUNNING}}), and thus, 
{{KafkaStreams}} itself is in state {{REBALANCING}} all the time and does not 
transit back to {{RUNNING}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3121: HOTFIX: Replace JDK download and fix missing argum...

2017-05-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3121


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #3121: HOTFIX: Replace JDK download and fix missing argum...

2017-05-22 Thread ewencp
GitHub user ewencp opened a pull request:

https://github.com/apache/kafka/pull/3121

HOTFIX: Replace JDK download and fix missing argument in Vagrant 
provisioning script



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ewencp/kafka hotfix-vagrant-provisioning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3121.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3121


commit 42b31202a9d57ce57f35bcdc24e813beab55f8a4
Author: Ewen Cheslack-Postava 
Date:   2017-05-23T00:40:11Z

HOTFIX: Replace JDK download and fix missing argument in Vagrant 
provisioning script




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5265) Move ACLs, Config, NodeVersions classes into org.apache.kafka.common

2017-05-22 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020479#comment-16020479
 ] 

Colin P. McCabe commented on KAFKA-5265:


https://github.com/apache/kafka/pull/3120

> Move ACLs, Config, NodeVersions classes into org.apache.kafka.common
> 
>
> Key: KAFKA-5265
> URL: https://issues.apache.org/jira/browse/KAFKA-5265
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
> Fix For: 0.11.0.0
>
>
> We should move the `ACLs`, `Config`, and `NodeVersions` classes into 
> `org.apache.kafka.common`.  That will make the easier to use in server code 
> as well as admin client code.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5218) New Short serializer, deserializer, serde

2017-05-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020477#comment-16020477
 ] 

Guozhang Wang commented on KAFKA-5218:
--

Done.

> New Short serializer, deserializer, serde
> -
>
> Key: KAFKA-5218
> URL: https://issues.apache.org/jira/browse/KAFKA-5218
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Mario Molina
>Assignee: Mario Molina
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> There is no Short serializer/deserializer in the current clients component.
> It could be useful when using Kafka-Connect to write data to databases with 
> SMALLINT fields (or similar) and avoiding conversions to int improving a bit 
> the performance in terms of memory and network.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3120: Kafka 5265

2017-05-22 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/3120

Kafka 5265



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-5265

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3120.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3120


commit e0527e64e6828d5c14569edf03f30fd5bbe2808e
Author: Colin P. Mccabe 
Date:   2017-05-22T22:37:12Z

rename classes

CreateTopicResults -> CreateTopicsResults
DeleteTopicResults -> DeleteTopicsResults
move TopicPartitionInfo to org.apache.kafka.common
move AccessControlEntry to org.apache.kafka.common.acl
move AccessControlEntryData to org.apache.kafka.common.acl
move AccessControlEntryFilter to org.apache.kafka.common.acl
move AclBinding to org.apache.kafka.common.acl
move AclBindingFilter to org.apache.kafka.common.acl
move AclOperation to org.apache.kafka.common.acl
move AclPermissionType to org.apache.kafka.common.acl
move ConfigResource to org.apache.common.config
move Resource to org.apache.common.resource
move ResourceFilter to org.apache.common.resource
move ResourceType to org.apache.common.resource

commit 52408250e439e18b96a27f5c3b5428c6c58b7dec
Author: Colin P. Mccabe 
Date:   2017-05-23T00:21:01Z

Add TopicConfigs.java




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (KAFKA-5218) New Short serializer, deserializer, serde

2017-05-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-5218:


Assignee: Mario Molina

> New Short serializer, deserializer, serde
> -
>
> Key: KAFKA-5218
> URL: https://issues.apache.org/jira/browse/KAFKA-5218
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Mario Molina
>Assignee: Mario Molina
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> There is no Short serializer/deserializer in the current clients component.
> It could be useful when using Kafka-Connect to write data to databases with 
> SMALLINT fields (or similar) and avoiding conversions to int improving a bit 
> the performance in terms of memory and network.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4896) Offset loading can use more threads

2017-05-22 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020447#comment-16020447
 ] 

Onur Karaman commented on KAFKA-4896:
-

handleGroupImmigration only gets called from handling a LeaderAndIsrRequest. 
Its cache load is synchronized over the ReplicaManager's replicaStateChangeLock.

handleGroupEmigration gets called in two places: when handling a 
LeaderAndIsrRequest as well as from StopReplicaRequest. Its cache removal is 
synchronized over the ReplicaManager's replicaStateChangeLock only in the 
LeaderAndIsrRequest handling path.

I think even with today's single scheduler thread, we might have an interleaved 
load and removal problem when the following are happening concurrently:
1. handleGroupImmigration is called for a partition P through 
LeaderAndIsrRequest
2. handleGroupEmigration is called for a partition P through StopReplicaRequest

This should be rare. Controller does a blocking send and receive, so I think 
this should only happen if:
1. one of the requests times out while the controller stays the same
2. the controller moves

[~junrao] does this edge case sound right?

> Offset loading can use more threads
> ---
>
> Key: KAFKA-4896
> URL: https://issues.apache.org/jira/browse/KAFKA-4896
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.0
>Reporter: Jun Rao
>Assignee: Abhishek Mendhekar
>  Labels: newbie
>
> Currently, in GroupMetadataManager, we have a single thread for loading the 
> offset cache. We could speed it up with more threads.
>  /* single-thread scheduler to handle offset/group metadata cache loading and 
> unloading */
>   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = 
> "group-metadata-manager-")



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5282) Transactions integration test: Use factory methods to keep track of open producers and consumers and close them all on tearDown

2017-05-22 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-5282:
--

Assignee: Vahid Hashemian

> Transactions integration test: Use factory methods to keep track of open 
> producers and consumers and close them all on tearDown
> ---
>
> Key: KAFKA-5282
> URL: https://issues.apache.org/jira/browse/KAFKA-5282
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Apurva Mehta
>Assignee: Vahid Hashemian
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> See: https://github.com/apache/kafka/pull/3093/files#r117354588
> The current transactions integration test creates individual producers and 
> consumer per test, and closes them independently. 
> It would be more robust to create them through a central factory method that 
> keeps track of each instance, and then close those instances on `tearDown`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is back to normal : kafka-0.11.0-jdk7 #14

2017-05-22 Thread Apache Jenkins Server
See 




[jira] [Updated] (KAFKA-5273) KafkaConsumer.committed() should get latest committed offsets from the server

2017-05-22 Thread Apurva Mehta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apurva Mehta updated KAFKA-5273:

Status: Patch Available  (was: In Progress)

> KafkaConsumer.committed() should get latest committed offsets from the server
> -
>
> Key: KAFKA-5273
> URL: https://issues.apache.org/jira/browse/KAFKA-5273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> Currently, the `KafkaConsumer.committed(topicPartition)` will return the 
> current position of the consumer for that partition if the consumer has been 
> assigned the partition. Otherwise, it will lookup the committed position from 
> the server. 
> With the new producer `sendOffsetsToTransaction` api, we get into a state 
> where we can commit the offsets for an assigned partition through the 
> producer. So the consumer doesn't update it's cached view and subsequently 
> returns a stale committed offset for it's assigned partition. 
> We should either update the consumer's cache when offsets are committed 
> through the producer, or drop the cache totally and always lookup the server 
> to get the committed offset. This way the `committed` method will always 
> return the latest committed offset for any partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5273) KafkaConsumer.committed() should get latest committed offsets from the server

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020408#comment-16020408
 ] 

ASF GitHub Bot commented on KAFKA-5273:
---

GitHub user apurvam opened a pull request:

https://github.com/apache/kafka/pull/3119

KAFKA-5273: Make KafkaConsumer.committed query the server for all partitions

Before this patch the consumer would return the cached offsets for 
partitions in its current assignment. This worked when all the offset commits 
went through the consumer. 

With KIP-98, offsets can be committed transactionally through the producer. 
This means that relying on cached positions in the consumer returns incorrect 
information: since commits go through the producer, the cache is never updated. 

Hence we need to update the `KafkaConsumer.committed` method to always 
lookup the server for the last committed offset to ensure it gets the correct 
information every time.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apurvam/kafka 
KAFKA-5273-kafkaconsumer-committed-should-always-hit-server

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3119.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3119


commit 17eb7eab70a40e3d4208a56463bb418350f80950
Author: Apurva Mehta 
Date:   2017-05-22T23:36:38Z

Make KafkaConsumer.committed hit the server for all partitions, even those 
in its current assignment




> KafkaConsumer.committed() should get latest committed offsets from the server
> -
>
> Key: KAFKA-5273
> URL: https://issues.apache.org/jira/browse/KAFKA-5273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> Currently, the `KafkaConsumer.committed(topicPartition)` will return the 
> current position of the consumer for that partition if the consumer has been 
> assigned the partition. Otherwise, it will lookup the committed position from 
> the server. 
> With the new producer `sendOffsetsToTransaction` api, we get into a state 
> where we can commit the offsets for an assigned partition through the 
> producer. So the consumer doesn't update it's cached view and subsequently 
> returns a stale committed offset for it's assigned partition. 
> We should either update the consumer's cache when offsets are committed 
> through the producer, or drop the cache totally and always lookup the server 
> to get the committed offset. This way the `committed` method will always 
> return the latest committed offset for any partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3119: KAFKA-5273: Make KafkaConsumer.committed query the...

2017-05-22 Thread apurvam
GitHub user apurvam opened a pull request:

https://github.com/apache/kafka/pull/3119

KAFKA-5273: Make KafkaConsumer.committed query the server for all partitions

Before this patch the consumer would return the cached offsets for 
partitions in its current assignment. This worked when all the offset commits 
went through the consumer. 

With KIP-98, offsets can be committed transactionally through the producer. 
This means that relying on cached positions in the consumer returns incorrect 
information: since commits go through the producer, the cache is never updated. 

Hence we need to update the `KafkaConsumer.committed` method to always 
lookup the server for the last committed offset to ensure it gets the correct 
information every time.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apurvam/kafka 
KAFKA-5273-kafkaconsumer-committed-should-always-hit-server

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3119.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3119


commit 17eb7eab70a40e3d4208a56463bb418350f80950
Author: Apurva Mehta 
Date:   2017-05-22T23:36:38Z

Make KafkaConsumer.committed hit the server for all partitions, even those 
in its current assignment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-5308) TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response

2017-05-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-5308:
---
Labels: exactly-once  (was: )

> TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response
> --
>
> Key: KAFKA-5308
> URL: https://issues.apache.org/jira/browse/KAFKA-5308
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> It could be the case that one of the topics added to a transaction is on a 
> lower message format version. Because of 
> https://github.com/apache/kafka/pull/3118, the producer won't be able to send 
> any data to that topic, but the TC will nevertheless try to write the 
> commit/abort marker to the log. Like the Produce request, the WriteTxnMarker 
> request should return the UNSUPPORTED_FOR_MESSSAGE_FORMAT error. Instead of 
> retrying, we should log a warning and remove the partition from the set of 
> partitions awaiting marker completion.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5307) Concurrent offset commit on same partition may result in inconsistent cache

2017-05-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-5307:
---
Fix Version/s: (was: 0.11.0.0)

> Concurrent offset commit on same partition may result in inconsistent cache
> ---
>
> Key: KAFKA-5307
> URL: https://issues.apache.org/jira/browse/KAFKA-5307
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> Consider two concurrent offset commits: the first one at offset A, the second 
> at offset B. It is possible for both offsets to be appended to the log before 
> either of them had been successfully acknowledged. Currently we maintain in 
> the group metadata a collection of pending offset commits, but we only store 
> one offset for each partition. When the commit for A is received, we add an 
> entry for that partition with A. When the commit for B arrives, we would 
> overwrite that value with B. Now two cases where this results in incorrect 
> behavior.
> 1. After the callback for A is invoked, we materialize whatever offset is in 
> the pending offset collection unconditionally. In this case, we would 
> incorrectly materialize B even though it had not been successfully written. 
> Later B may fail and we have inconsistent state.
> 2. It could happen that the callback for B is invoked before A. If the result 
> was successful, no harm done, but if it failed, then we need to restore A 
> since it may still be completed successfully. If it does complete 
> successfully, then we'll have inconsistent state again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5307) Concurrent offset commit on same partition may result in inconsistent cache

2017-05-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-5307:
---
Fix Version/s: 0.11.0.0

> Concurrent offset commit on same partition may result in inconsistent cache
> ---
>
> Key: KAFKA-5307
> URL: https://issues.apache.org/jira/browse/KAFKA-5307
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> Consider two concurrent offset commits: the first one at offset A, the second 
> at offset B. It is possible for both offsets to be appended to the log before 
> either of them had been successfully acknowledged. Currently we maintain in 
> the group metadata a collection of pending offset commits, but we only store 
> one offset for each partition. When the commit for A is received, we add an 
> entry for that partition with A. When the commit for B arrives, we would 
> overwrite that value with B. Now two cases where this results in incorrect 
> behavior.
> 1. After the callback for A is invoked, we materialize whatever offset is in 
> the pending offset collection unconditionally. In this case, we would 
> incorrectly materialize B even though it had not been successfully written. 
> Later B may fail and we have inconsistent state.
> 2. It could happen that the callback for B is invoked before A. If the result 
> was successful, no harm done, but if it failed, then we need to restore A 
> since it may still be completed successfully. If it does complete 
> successfully, then we'll have inconsistent state again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5308) TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response

2017-05-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020384#comment-16020384
 ] 

Jason Gustafson commented on KAFKA-5308:


cc [~guozhang] [~damianguy]

> TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response
> --
>
> Key: KAFKA-5308
> URL: https://issues.apache.org/jira/browse/KAFKA-5308
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 0.11.0.0
>
>
> It could be the case that one of the topics added to a transaction is on a 
> lower message format version. Because of 
> https://github.com/apache/kafka/pull/3118, the producer won't be able to send 
> any data to that topic, but the TC will nevertheless try to write the 
> commit/abort marker to the log. Like the Produce request, the WriteTxnMarker 
> request should return the UNSUPPORTED_FOR_MESSSAGE_FORMAT error. Instead of 
> retrying, we should log a warning and remove the partition from the set of 
> partitions awaiting marker completion.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5308) TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response

2017-05-22 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5308:
--

 Summary: TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in 
WriteTxnMarker response
 Key: KAFKA-5308
 URL: https://issues.apache.org/jira/browse/KAFKA-5308
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Priority: Blocker


It could be the case that one of the topics added to a transaction is on a 
lower message format version. Because of 
https://github.com/apache/kafka/pull/3118, the producer won't be able to send 
any data to that topic, but the TC will nevertheless try to write the 
commit/abort marker to the log. Like the Produce request, the WriteTxnMarker 
request should return the UNSUPPORTED_FOR_MESSSAGE_FORMAT error. Instead of 
retrying, we should log a warning and remove the partition from the set of 
partitions awaiting marker completion.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3118: MINOR: Broker should disallow downconversion of tr...

2017-05-22 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/3118

MINOR: Broker should disallow downconversion of transactional/idempotent 
records



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka 
disallow-transactional-idempotent-downconversion

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3118.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3118


commit b1ea146b167668cdbbeadc41b88874167eb59feb
Author: Jason Gustafson 
Date:   2017-05-22T23:02:44Z

MINOR: Broker should disallow downconversion of transactional/idempotent 
records




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-5186) Avoid expensive initialization of producer state when upgrading

2017-05-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-5186:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 3113
[https://github.com/apache/kafka/pull/3113]

> Avoid expensive initialization of producer state when upgrading
> ---
>
> Key: KAFKA-5186
> URL: https://issues.apache.org/jira/browse/KAFKA-5186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> Currently the producer state is always loaded upon broker initialization. If 
> we don't find a snapshot file to load from, then we scan the log segments 
> from the beginning to rebuild the state. Of course, when users upgrade to the 
> new version, there will be no snapshot file, so the upgrade could be quite 
> intensive. It would be nice to avoid this by assuming instead that the 
> absence of a snapshot file means that the producer state should start clean 
> and we can avoid the expensive scanning.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5186) Avoid expensive initialization of producer state when upgrading

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020332#comment-16020332
 ] 

ASF GitHub Bot commented on KAFKA-5186:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3113


> Avoid expensive initialization of producer state when upgrading
> ---
>
> Key: KAFKA-5186
> URL: https://issues.apache.org/jira/browse/KAFKA-5186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> Currently the producer state is always loaded upon broker initialization. If 
> we don't find a snapshot file to load from, then we scan the log segments 
> from the beginning to rebuild the state. Of course, when users upgrade to the 
> new version, there will be no snapshot file, so the upgrade could be quite 
> intensive. It would be nice to avoid this by assuming instead that the 
> absence of a snapshot file means that the producer state should start clean 
> and we can avoid the expensive scanning.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3113: KAFKA-5186: Avoid expensive log scan to build prod...

2017-05-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2017-05-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020279#comment-16020279
 ] 

Randall Hauch edited comment on KAFKA-3821 at 5/22/17 10:12 PM:


[~ewencp], the more I think about this issue, the more I think that it should 
be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}:

{code:java}
public class OffsetRecord extends SourceRecord {
  public OffsetRecord(Map sourcePartition, 
  Map sourceOffset) {
super(sourcePartition, sourceOffset, 
  null, null, null, null, null, null);
  }
  // with hashCode(), equals(), toString(), and newRecord(...) 
  // implementations
}
{code}

The {{SourceTaskWorker}} would need to check instanceof and skip a few things 
it's currently doing (e.g., using null for key and value rather than converting 
them, not sending to the producer), but everything else would work fine as is 
since {{OffsetRecord}} would be a subtype of {{SourceRecord}}.

I also think that we should *not* introduce a supertype of {{SourceRecord}} and 
change the signature of {{SourceTask.poll()}} to return something other than 
{{List}}. Technically we could do this because it is a binary 
compatible change (due to type erasure), but it's way too confusing / 
surprising with almost no real benefit. Besides, it'd require a fair amount of 
refactoring of the implementation.

If you agree, then I can write up a KIP for this so that we can start the 
formal discussion.



was (Author: rhauch):
[~ewencp], the more I think about this issue, the more I think that it should 
be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}:

{code:java}
public class OffsetRecord extends SourceRecord {
  public OffsetRecord(Map sourcePartition, 
  Map sourceOffset) {
super(sourcePartition, sourceOffset, null, null, null, null, null, null);
  }
  // with hashCode(), equals(), toString(), and newRecord(...) implementations
}
{code}

The {{SourceTaskWorker}} would need to check instanceof and skip a few things 
it's currently doing (e.g., using null for key and value rather than converting 
them, not sending to the producer), but everything else would work fine as is 
since {{OffsetRecord}} would be a subtype of {{SourceRecord}}.

I also think that we should *not* introduce a supertype of {{SourceRecord}} and 
change the signature of {{SourceTask.poll()}} to return something other than 
{{List}}. Technically we could do this because it is a binary 
compatible change (due to type erasure), but it's way too confusing / 
surprising with almost no real benefit. Besides, it'd require a fair amount of 
refactoring of the implementation.

If you agree, then I can write up a KIP for this so that we can start the 
formal discussion.


> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2017-05-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020279#comment-16020279
 ] 

Randall Hauch commented on KAFKA-3821:
--

[~ewencp], the more I think about this issue, the more I think that it should 
be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}:

{code:java}
public class OffsetRecord extends SourceRecord {
  public OffsetRecord(Map sourcePartition, 
  Map sourceOffset) {
super(sourcePartition, sourceOffset, null, null, null, null, null, null);
  }
  // with hashCode(), equals(), toString(), and newRecord(...) implementations
}
{code}

The {{SourceTaskWorker}} would need to check instanceof and skip a few things 
it's currently doing (e.g., using null for key and value rather than converting 
them, not sending to the producer), but everything else would work fine as is 
since {{OffsetRecord}} would be a subtype of {{SourceRecord}}.

I also think that we should *not* introduce a supertype of {{SourceRecord}} and 
change the signature of {{SourceTask.poll()}} to return something other than 
{{List}}. Technically we could do this because it is a binary 
compatible change (due to type erasure), but it's way too confusing / 
surprising with almost no real benefit. Besides, it'd require a fair amount of 
refactoring of the implementation.

If you agree, then I can write up a KIP for this so that we can start the 
formal discussion.


> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Reg: [VOTE] KIP 157 - Add consumer config options to streams reset tool

2017-05-22 Thread BigData dev
Hi Matthias,
For the AdminClient, client configuration is needed. And for zookeeper, no
properties are required.
So, in other tools like consumerGroupCommand, they used the command config
option.
I think consumer-config and consumer-property are not required here. We
will use the configurations passed through command-config for both admin
client and Embedded consumer.

Thanks,
Bharat


On Fri, May 19, 2017 at 4:26 PM, Matthias J. Sax 
wrote:

> Couple of follow ups.
>
> The reset tool uses AdminClient, ZkUtils, and a KafkaConsumer
> internally. Thus, I am wondering if we need the possibility to specify
> configs for all of them?
>
> The original JIRA reported, that the reset tool does not work for a
> secured cluster, and thus, I doubt that consumer properties are
> sufficient for resolve this.
>
> Maybe some people that are more familiar with Kafka security can help
> out here. I personally have only limited knowledge about this topic.
>
>
> -Matthias
>
>
>
> On 5/19/17 11:09 AM, BigData dev wrote:
> > Thanks for the info, Matthias.
> >
> > Regards,
> > Bharat
> >
> >
> > On Fri, May 19, 2017 at 10:25 AM, Matthias J. Sax  >
> > wrote:
> >
> >> KIP-157 cannot be included in 0.11.0.0 anymore. KIP freeze date deadline
> >> is strict.
> >>
> >> -Matthias
> >>
> >> On 5/19/17 10:15 AM, BigData dev wrote:
> >>> Hi Matthias,
> >>> I will start a new KIP for Kafka tools options to be a standard across
> >> all
> >>> tools shortly. But I think the KIP 157 for Kafka Streams, should be
> >> needed
> >>> for 0.11.0.0 release, (KIP freeze date is already over, but I think
> this
> >> is
> >>> minor code change in tools to add option to streams reset tool) as
> >> without
> >>> this consumer config options, it will not be possible to use the tool
> in
> >> a
> >>> secured environment. Please let me know your thoughts on this. If it
> >> needs
> >>> to be moved to next release, I will work on this as part of KIP 14.
> >>>
> >>> Thanks,
> >>> Bharat
> >>>
> >>>
> >>> On Fri, May 19, 2017 at 10:10 AM, Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>> wrote:
> >>>
>  I double checked with Matthew Warhaftig (the original author of
> KIP-14)
>  and he has not interest to continue the KIP atm.
> 
>  Thus, Bharat can continue the work on KIP-14. I think it would be
> best,
>  to start a new DISCUSS thread after you update KIP-14.
> 
>  Thanks for your contributions!
> 
> 
>  -Matthias
> 
> 
>  On 5/17/17 12:56 PM, BigData dev wrote:
> > Hi,
> > When I was trying to find more info, there is already a proposed KIP
> >> for
> > this
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>  14+-+Tools+Standardization
> >
> >
> > Thanks,
> > Bharat
> >
> > On Wed, May 17, 2017 at 12:38 PM, BigData dev <
> bigdatadev...@gmail.com
> >>>
> > wrote:
> >
> >> Hi Ewen, Matthias,
> >> For common configuration across all the tools, I will work on that
> as
>  part
> >> of other KIP by looking into all Kafka tools.
> >>
> >>
> >> Thanks,
> >> Bharat
> >>
> >>
> >> On Wed, May 17, 2017 at 9:40 AM, Matthias J. Sax <
> >> matth...@confluent.io
> >
> >> wrote:
> >>
> >>> +1
> >>>
> >>> I also second Ewen comment -- standardizing the common supported
> >>> parameters over all tools would be great!
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 5/17/17 12:57 AM, Damian Guy wrote:
>  +1
> 
>  On Wed, 17 May 2017 at 05:40 Ewen Cheslack-Postava <
> >> e...@confluent.io
> >
>  wrote:
> 
> > +1 (binding)
> >
> > I mentioned this in the PR that triggered this:
> >
> >> KIP is accurate, though this is one of those things that we
> should
> > probably get a KIP for a standard set of config options across
> all
> >>> tools so
> > additions like this can just fall under the umbrella of that
> KIP...
> >
> > I think it would be great if someone wrote up a small KIP
> providing
> >>> some
> > standardized settings that we could get future additions
>  automatically
> > umbrella'd under, e.g. no need to do a KIP if just adding a
> >>> consumer.config
> > or consumer-property config conforming to existing expectations
> for
> >>> other
> > tools. We could also standardize on a few other settings names
> that
>  are
> > inconsistent across different tools and set out a clear path
> >> forward
> >>> for
> > future tools.
> >
> > I think I still have at least one open PR from when I first
> started
>  on
> >>> the
> > project where I was trying to clean up some command line stuff to
> >> be
> >>> more
> > consistent. This has been an issue for many years now...
> >
> > 

[jira] [Commented] (KAFKA-5307) Concurrent offset commit on same partition may result in inconsistent cache

2017-05-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020225#comment-16020225
 ] 

Jason Gustafson commented on KAFKA-5307:


cc [~guozhang] [~apurva]

> Concurrent offset commit on same partition may result in inconsistent cache
> ---
>
> Key: KAFKA-5307
> URL: https://issues.apache.org/jira/browse/KAFKA-5307
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> Consider two concurrent offset commits: the first one at offset A, the second 
> at offset B. It is possible for both offsets to be appended to the log before 
> either of them had been successfully acknowledged. Currently we maintain in 
> the group metadata a collection of pending offset commits, but we only store 
> one offset for each partition. When the commit for A is received, we add an 
> entry for that partition with A. When the commit for B arrives, we would 
> overwrite that value with B. Now two cases where this results in incorrect 
> behavior.
> 1. After the callback for A is invoked, we materialize whatever offset is in 
> the pending offset collection unconditionally. In this case, we would 
> incorrectly materialize B even though it had not been successfully written. 
> Later B may fail and we have inconsistent state.
> 2. It could happen that the callback for B is invoked before A. If the result 
> was successful, no harm done, but if it failed, then we need to restore A 
> since it may still be completed successfully. If it does complete 
> successfully, then we'll have inconsistent state again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5307) Concurrent offset commit on same partition may result in inconsistent cache

2017-05-22 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5307:
--

 Summary: Concurrent offset commit on same partition may result in 
inconsistent cache
 Key: KAFKA-5307
 URL: https://issues.apache.org/jira/browse/KAFKA-5307
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


Consider two concurrent offset commits: the first one at offset A, the second 
at offset B. It is possible for both offsets to be appended to the log before 
either of them had been successfully acknowledged. Currently we maintain in the 
group metadata a collection of pending offset commits, but we only store one 
offset for each partition. When the commit for A is received, we add an entry 
for that partition with A. When the commit for B arrives, we would overwrite 
that value with B. Now two cases where this results in incorrect behavior.

1. After the callback for A is invoked, we materialize whatever offset is in 
the pending offset collection unconditionally. In this case, we would 
incorrectly materialize B even though it had not been successfully written. 
Later B may fail and we have inconsistent state.
2. It could happen that the callback for B is invoked before A. If the result 
was successful, no harm done, but if it failed, then we need to restore A since 
it may still be completed successfully. If it does complete successfully, then 
we'll have inconsistent state again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5225) StreamsResetter doesn't allow custom Consumer properties

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020221#comment-16020221
 ] 

ASF GitHub Bot commented on KAFKA-5225:
---

GitHub user bharatviswa504 opened a pull request:

https://github.com/apache/kafka/pull/3117

KAFKA-5225:StreamsResetter doesn't allow custom Consumer properties

Added command-config option, as the client configuration is required for 
AdminClient and Embedded Consumer.

@mjsax @guozhangwang please review the changes.

@msjax from previous PR couple of questions.
1. Tests for Secure cluster, do you mean to add Integration Test?
2. --dry-run option should print user configs or not? This is just a 
thought. (Not got what do you mean here?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bharatviswa504/kafka KAFKA-5225

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3117


commit 3c6bc850ad9d0a5c3118965f7996e2f3dc7171dd
Author: Bharat Viswanadham 
Date:   2017-05-22T21:12:24Z

KAFKA-5225:StreamsResetter doesn't allow custom Consumer properties




> StreamsResetter doesn't allow custom Consumer properties
> 
>
> Key: KAFKA-5225
> URL: https://issues.apache.org/jira/browse/KAFKA-5225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 0.10.2.1
>Reporter: Dustin Cote
>Assignee: Bharat Viswanadham
>  Labels: needs-kip
>
> The StreamsResetter doesn't let the user pass in any configurations to the 
> embedded consumer. This is a problem in secured environments because you 
> can't configure the embedded consumer to talk to the cluster. The tool should 
> take an approach similar to `kafka.admin.ConsumerGroupCommand` which allows a 
> config file to be passed in the command line for such operations.
> cc [~mjsax]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3117: KAFKA-5225:StreamsResetter doesn't allow custom Co...

2017-05-22 Thread bharatviswa504
GitHub user bharatviswa504 opened a pull request:

https://github.com/apache/kafka/pull/3117

KAFKA-5225:StreamsResetter doesn't allow custom Consumer properties

Added command-config option, as the client configuration is required for 
AdminClient and Embedded Consumer.

@mjsax @guozhangwang please review the changes.

@msjax from previous PR couple of questions.
1. Tests for Secure cluster, do you mean to add Integration Test?
2. --dry-run option should print user configs or not? This is just a 
thought. (Not got what do you mean here?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bharatviswa504/kafka KAFKA-5225

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3117


commit 3c6bc850ad9d0a5c3118965f7996e2f3dc7171dd
Author: Bharat Viswanadham 
Date:   2017-05-22T21:12:24Z

KAFKA-5225:StreamsResetter doesn't allow custom Consumer properties




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-5225) StreamsResetter doesn't allow custom Consumer properties

2017-05-22 Thread Bharat Viswanadham (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bharat Viswanadham updated KAFKA-5225:
--
Status: In Progress  (was: Patch Available)

> StreamsResetter doesn't allow custom Consumer properties
> 
>
> Key: KAFKA-5225
> URL: https://issues.apache.org/jira/browse/KAFKA-5225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 0.10.2.1
>Reporter: Dustin Cote
>Assignee: Bharat Viswanadham
>  Labels: needs-kip
>
> The StreamsResetter doesn't let the user pass in any configurations to the 
> embedded consumer. This is a problem in secured environments because you 
> can't configure the embedded consumer to talk to the cluster. The tool should 
> take an approach similar to `kafka.admin.ConsumerGroupCommand` which allows a 
> config file to be passed in the command line for such operations.
> cc [~mjsax]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5225) StreamsResetter doesn't allow custom Consumer properties

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020216#comment-16020216
 ] 

ASF GitHub Bot commented on KAFKA-5225:
---

Github user bharatviswa504 closed the pull request at:

https://github.com/apache/kafka/pull/3062


> StreamsResetter doesn't allow custom Consumer properties
> 
>
> Key: KAFKA-5225
> URL: https://issues.apache.org/jira/browse/KAFKA-5225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 0.10.2.1
>Reporter: Dustin Cote
>Assignee: Bharat Viswanadham
>  Labels: needs-kip
>
> The StreamsResetter doesn't let the user pass in any configurations to the 
> embedded consumer. This is a problem in secured environments because you 
> can't configure the embedded consumer to talk to the cluster. The tool should 
> take an approach similar to `kafka.admin.ConsumerGroupCommand` which allows a 
> config file to be passed in the command line for such operations.
> cc [~mjsax]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3062: KAFKA-5225: StreamsResetter tool to allow custom c...

2017-05-22 Thread bharatviswa504
Github user bharatviswa504 closed the pull request at:

https://github.com/apache/kafka/pull/3062


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-5236) Regression in on-disk log size when using Snappy compression with 0.8.2 log message format

2017-05-22 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5236:
---
Labels: regression  (was: )

> Regression in on-disk log size when using Snappy compression with 0.8.2 log 
> message format
> --
>
> Key: KAFKA-5236
> URL: https://issues.apache.org/jira/browse/KAFKA-5236
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Nick Travers
>  Labels: regression
> Fix For: 0.11.0.0
>
>
> We recently upgraded our brokers in our production environments from 0.10.1.1 
> to 0.10.2.1 and we've noticed a sizable regression in the on-disk .log file 
> size. For some deployments the increase was as much as 50%.
> We run our brokers with the 0.8.2 log message format version. The majority of 
> our message volume comes from 0.10.x Java clients sending messages encoded 
> with the Snappy codec.
> Some initial testing only shows a regression between the two versions when 
> using Snappy compression with a log message format of 0.8.2.
> I also tested 0.10.x log message formats as well as Gzip compression. The log 
> sizes do not differ in this case, so the issue seems confined to 0.8.2 
> message format and Snappy compression.
> A git-bisect lead me to this commit, which modified the server-side 
> implementation of `Record`:
> https://github.com/apache/kafka/commit/67f1e5b91bf073151ff57d5d656693e385726697
> Here's the PR, which has more context:
> https://github.com/apache/kafka/pull/2140
> Here is a link to the test I used to re-producer this issue:
> https://github.com/nicktrav/kafka/commit/68e8db4fa525e173651ac740edb270b0d90b8818
> cc: [~hachikuji] [~junrao] [~ijuma] [~guozhang] (tagged on original PR)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5236) Regression in on-disk log size when using Snappy compression with 0.8.2 log message format

2017-05-22 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5236:
---
Fix Version/s: 0.11.0.0

> Regression in on-disk log size when using Snappy compression with 0.8.2 log 
> message format
> --
>
> Key: KAFKA-5236
> URL: https://issues.apache.org/jira/browse/KAFKA-5236
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Nick Travers
> Fix For: 0.11.0.0
>
>
> We recently upgraded our brokers in our production environments from 0.10.1.1 
> to 0.10.2.1 and we've noticed a sizable regression in the on-disk .log file 
> size. For some deployments the increase was as much as 50%.
> We run our brokers with the 0.8.2 log message format version. The majority of 
> our message volume comes from 0.10.x Java clients sending messages encoded 
> with the Snappy codec.
> Some initial testing only shows a regression between the two versions when 
> using Snappy compression with a log message format of 0.8.2.
> I also tested 0.10.x log message formats as well as Gzip compression. The log 
> sizes do not differ in this case, so the issue seems confined to 0.8.2 
> message format and Snappy compression.
> A git-bisect lead me to this commit, which modified the server-side 
> implementation of `Record`:
> https://github.com/apache/kafka/commit/67f1e5b91bf073151ff57d5d656693e385726697
> Here's the PR, which has more context:
> https://github.com/apache/kafka/pull/2140
> Here is a link to the test I used to re-producer this issue:
> https://github.com/nicktrav/kafka/commit/68e8db4fa525e173651ac740edb270b0d90b8818
> cc: [~hachikuji] [~junrao] [~ijuma] [~guozhang] (tagged on original PR)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5275) Review and potentially tweak AdminClient API for the initial release (KIP-117)

2017-05-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017701#comment-16017701
 ] 

Randall Hauch edited comment on KAFKA-5275 at 5/22/17 8:55 PM:
---

[~cmccabe] wrote:

{quote}
We could add an option to {{CreateTopicsOptions}} that suppresses 
{{TopicExistsException}} in the results. I guess the question is, does your 
code want to at least log something when a topic already exists rather than 
getting created, since the configuration options will potentially be 
different...
{quote}

Kafka Connect does log which topics were created and which were found. That'd 
be trivial if {{CreateTopicResults}} had methods to return the names of the 
newly-created and existing topics -- perhaps something like:

{code:java}
KafkaFuture createdTopicNames() {...}
KafkaFuture existingTopicNames() {...}
{code}

[~xvrl] wrote:
{quote}
However, we should also define constants for every topic configuration option 
that exists.
{quote}

Yes, and ideally the NewTopic builder might even have some convenience methods 
that saved having to use some of them.


was (Author: rhauch):
[~colinmccabe] wrote:

{quote}
We could add an option to {{CreateTopicsOptions}} that suppresses 
{{TopicExistsException}} in the results. I guess the question is, does your 
code want to at least log something when a topic already exists rather than 
getting created, since the configuration options will potentially be 
different...
{quote}

Kafka Connect does log which topics were created and which were found. That'd 
be trivial if {{CreateTopicResults}} had methods to return the names of the 
newly-created and existing topics -- perhaps something like:

{code:java}
KafkaFuture createdTopicNames() {...}
KafkaFuture existingTopicNames() {...}
{code}

[~xvrl] wrote:
{quote}
However, we should also define constants for every topic configuration option 
that exists.
{quote}

Yes, and ideally the NewTopic builder might even have some convenience methods 
that saved having to use some of them.

> Review and potentially tweak AdminClient API for the initial release (KIP-117)
> --
>
> Key: KAFKA-5275
> URL: https://issues.apache.org/jira/browse/KAFKA-5275
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.0.0
>
>
> Once all the pieces are in, we should take a pass and ensure that the APIs 
> work well together and that they are consistent.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Kafka Connect: To much restarting with a SourceConnector with dynamic set of tasks

2017-05-22 Thread Randall Hauch
You're not doing anything wrong, but I suspect you're requesting task
reconfiguration more frequently than was originally envisioned, which means
that the current implementation is not as optimal for your case.

I'm not sure how much effort is required to implement this new behavior.
The logic for the standalone worker is pretty straightforward, but the
logic for the distributed worker is going to be much more involved. But we
also need to be careful about changing existing behavior, since it's not
hard to imagine connectors that might expect that all tasks be restarted
when there are any changes to the task configurations. If there's any
potential that this is the case, we'd have to be sure to keep the existing
behavior as the default but to somehow enable the new behavior if desired.

One possibility is to add an overloaded requestTaskReconfiguration(boolean
changedOnly) that specifies whether only changed tasks should be
reconfigured. This way the existing requestTaskReconfiguration() method
could be changed to call requestTaskReconfiguration(false), and then the
implementation has to deal with this.

But again, the bigger challenge is to implement this new behavior in the
DistributedHerder. OTOH, perhaps it's not as complicated as I might guess.



On Tue, May 16, 2017 at 4:57 AM, Per Steffensen  wrote:

> Hi
>
> Kafka (Connect) 0.10.2.1
>
> I am writing my own SourceConnector. It will communicate with a remote
> server, and continuously calculate the set of tasks that has to be running.
> Each task also makes a connection to the remote server from which it will
> get its data to forward to Kafka.
>
> When the SourceConnector realizes that the set of tasks has to be
> modified, it makes sure taskConfigs-method will return config for the new
> complete set of tasks (likely including tasks that already existed before,
> probably some new tasks, and maybe some of the existing tasks will no
> longer be included). After that the SourceConnector calls
> context.requestTaskReconfiguration. This results in the current instance
> of my SourceConnector and all existing/running tasks gets stopped, a new
> instance of my SourceConnector gets created and all tasks (those that
> existed before and new ones) are started.
>
> It all works nicely, but because my SourceConnector and my SourceTasks has
> to (re)establish connection and (re)initialize the streaming of data, and
> because my set of tasks changes fairly often, and because it very very
> often contains tasks that were also in the set before the change, I end up
> having lots of stop/start of tasks that really just ought to continue
> running.
>
> Any plans on making this more delta-ish, so that when doing a
> requestTaskReconfiguration
> * Only tasks that were not already in the task-config-set before the
> requestTaskConfiguration are started
> * Only tasks that were in the task-config-set before the
> requestTaskConfiguration, but not in the set after, are stopped
> * Tasks that are in the task-config-set both before and after
> requestTaskConfiguration, are just allowed to keep running, without
> restarting
> * Not so important: Do not create a new instance of the SourceConnector,
> just because it has a changed task-config-set
>
> Or am I doing something wrong in my SourceConnector? Are there a different
> way that I should maintain a dynamic set of tasks?
>
> Thanks!!!
>
> Regards, Per Steffensen
>
>


[jira] [Commented] (KAFKA-5275) Review and potentially tweak AdminClient API for the initial release (KIP-117)

2017-05-22 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020137#comment-16020137
 ] 

Colin P. McCabe commented on KAFKA-5275:


I can imagine clients handling some types of exceptions.  For example, an 
{{InvalidConfigurationException}} includes the name of the unknown 
configuration key, so you could strip that configuration key off and retry.  
Maybe you don't care about {{TopicExistsException}}, so you log that and ignore 
it.  If you get a subclass of {{RetriableException}}, you can retry at the 
application level (these are nearly always fallout from network issues which 
could be transitory.)  If you get {{AuthenticationException}}, you might want 
to display a nicer message to the user.  Of course, {{UnknownServerException}} 
really is unhandleable, but it also shouldn't happen that often.

> Review and potentially tweak AdminClient API for the initial release (KIP-117)
> --
>
> Key: KAFKA-5275
> URL: https://issues.apache.org/jira/browse/KAFKA-5275
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.0.0
>
>
> Once all the pieces are in, we should take a pass and ensure that the APIs 
> work well together and that they are consistent.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2017-05-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020100#comment-16020100
 ] 

Randall Hauch edited comment on KAFKA-3821 at 5/22/17 8:13 PM:
---

The problem with the connector directly using {{OffsetStorageWriter}} is that 
it cannot guarantee order relative to the source records that Kafka Connect is 
already processing. In my cases, the offset/partition should be updated as part 
of the sequence of normal source records, and that order must be maintained.

The best and simplest example is a connector that still wants to record that it 
is still making progress in its source, but for whatever reason is not 
producing any source records.

But imagine a case where the connector just recorded an offset via 
{{OffsetStorageWriter}} and then immediately produces a new {{SourceRecord}} 
with a new offset. This order is important, and it's really bad if the offset 
of the {{SourceRecord}} gets written before the connector's call. 

Of course, the opposite case is bad, too: imagine the connector producing 
{{SourceRecord}} that is enqueued and not immediately processed, but the 
connector progresses a bit and wants to record its new offset. If it did the 
latter by explicit writing to the {{OffsetStorageWriter}}, that might happen 
before the offset in the {{SourceRecord}} is captured.

Bottom line is that connectors need to be able to specify the order of 
{{SourceRecords}} and offset updates, and that likely means they all need to be 
sent through the same poll mechanism.


was (Author: rhauch):
The problem with the connector directly using {{OffsetStorageWriter}} is that 
it cannot guarantee order relative to the source records that Kafka Connect is 
already processing. In my cases, the offset/partition should be updated as part 
of the sequence of normal source records, and that order must be maintained.

The best and simplest example is a connector that still wants to record that it 
is still making progress in its source, but for whatever reason is not 
producing any source records.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is back to normal : kafka-trunk-jdk8 #1572

2017-05-22 Thread Apache Jenkins Server
See 




[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2017-05-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020100#comment-16020100
 ] 

Randall Hauch commented on KAFKA-3821:
--

The problem with the connector directly using {{OffsetStorageWriter}} is that 
it cannot guarantee order relative to the source records that Kafka Connect is 
already processing. In my cases, the offset/partition should be updated as part 
of the sequence of normal source records, and that order must be maintained.

The best and simplest example is a connector that still wants to record that it 
is still making progress in its source, but for whatever reason is not 
producing any source records.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5275) Review and potentially tweak AdminClient API for the initial release (KIP-117)

2017-05-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020058#comment-16020058
 ] 

Randall Hauch edited comment on KAFKA-5275 at 5/22/17 7:48 PM:
---

[~cmccabe] wrote:
{quote}
Hmm. Methods like these, can't handle the case where there is an error other 
than TopicExistsException. They have to throw an exception, and then you get no 
information about any topic other than the one that had the exception. Can't 
you just iterate through the map and do per-topic error handling? It just seems 
like an easier and cleaner solution, unless there's something I'm missing.
{quote}
Yes, that's true that clients likely don't know how to handle any kind of 
exception, so I would assume that they just log the exceptions and continue or 
fail by rethrowing them. However, you're right that we wouldn't want to lose 
any of the exceptions, so if we don't provide an exception handler or can't 
aggregate them somehow into a single exception then we probably shouldn't offer 
the capability to create topics as I was suggesting.


was (Author: rhauch):
[~cmccabe] wrote:
{quote}
Hmm. Methods like these, can't handle the case where there is an error other 
than TopicExistsException. They have to throw an exception, and then you get no 
information about any topic other than the one that had the exception. Can't 
you just iterate through the map and do per-topic error handling? It just seems 
like an easier and cleaner solution, unless there's something I'm missing.
{quote}
Yes, that's true that clients likely don't know how to handle any kind of 
exception, so I would assume that they just log the exceptions and continue or 
fail by rethrowing them. However, you're right that we wouldn't want to lose 
any of the exceptions, so if we don't provide an exception handler or can't 
aggregate them somehow into a single exception then we probably shouldn't offer 
the capability to create topics this way.

> Review and potentially tweak AdminClient API for the initial release (KIP-117)
> --
>
> Key: KAFKA-5275
> URL: https://issues.apache.org/jira/browse/KAFKA-5275
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.0.0
>
>
> Once all the pieces are in, we should take a pass and ensure that the APIs 
> work well together and that they are consistent.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5275) Review and potentially tweak AdminClient API for the initial release (KIP-117)

2017-05-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020058#comment-16020058
 ] 

Randall Hauch edited comment on KAFKA-5275 at 5/22/17 7:48 PM:
---

[~cmccabe] wrote:
{quote}
Hmm. Methods like these, can't handle the case where there is an error other 
than TopicExistsException. They have to throw an exception, and then you get no 
information about any topic other than the one that had the exception. Can't 
you just iterate through the map and do per-topic error handling? It just seems 
like an easier and cleaner solution, unless there's something I'm missing.
{quote}
Yes, that's true that clients likely don't know how to handle any kind of 
exception, so I would assume that they just log the exceptions and continue or 
fail by rethrowing them. However, you're right that we wouldn't want to lose 
any of the exceptions, so if we don't provide an exception handler or can't 
aggregate them somehow into a single exception then we probably shouldn't offer 
the capability to create topics this way.


was (Author: rhauch):
[~cmccabe] wrote:
{quote}
Hmm. Methods like these, can't handle the case where there is an error other 
than TopicExistsException. They have to throw an exception, and then you get no 
information about any topic other than the one that had the exception. Can't 
you just iterate through the map and do per-topic error handling? It just seems 
like an easier and cleaner solution, unless there's something I'm missing.
{quote}
Yes, that's true that consumers don't really know how to handle any kind of 
exception, so I would assume that they just log the exceptions and continue or 
fail by rethrowing them. However, you're right that we wouldn't want to lose 
any of the exceptions, so if we don't provide an exception handler or can't 
aggregate them somehow into a single exception then we probably shouldn't offer 
the capability to create topics this way.

> Review and potentially tweak AdminClient API for the initial release (KIP-117)
> --
>
> Key: KAFKA-5275
> URL: https://issues.apache.org/jira/browse/KAFKA-5275
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.0.0
>
>
> Once all the pieces are in, we should take a pass and ensure that the APIs 
> work well together and that they are consistent.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS] KIP-158: Kafka Connect should allow source connectors to set topic-specific settings for new topics

2017-05-22 Thread Randall Hauch
Hi, all.

We recently added the ability for Kafka Connect to create *internal* topics
using the new AdminClient, but it still would be great if Kafka Connect
could do this for new topics that result from source connector records.
I've outlined an approach to do this in "KIP-158 Kafka Connect should allow
source connectors to set topic-specific settings for new topics".

*https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics
*

Please take a look and provide feedback. Thanks!

Best regards,

Randall


[jira] [Commented] (KAFKA-5275) Review and potentially tweak AdminClient API for the initial release (KIP-117)

2017-05-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020058#comment-16020058
 ] 

Randall Hauch commented on KAFKA-5275:
--

[~cmccabe] wrote:
{quote}
Hmm. Methods like these, can't handle the case where there is an error other 
than TopicExistsException. They have to throw an exception, and then you get no 
information about any topic other than the one that had the exception. Can't 
you just iterate through the map and do per-topic error handling? It just seems 
like an easier and cleaner solution, unless there's something I'm missing.
{quote}
Yes, that's true that consumers don't really know how to handle any kind of 
exception, so I would assume that they just log the exceptions and continue or 
fail by rethrowing them. However, you're right that we wouldn't want to lose 
any of the exceptions, so if we don't provide an exception handler or can't 
aggregate them somehow into a single exception then we probably shouldn't offer 
the capability to create topics this way.

> Review and potentially tweak AdminClient API for the initial release (KIP-117)
> --
>
> Key: KAFKA-5275
> URL: https://issues.apache.org/jira/browse/KAFKA-5275
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.0.0
>
>
> Once all the pieces are in, we should take a pass and ensure that the APIs 
> work well together and that they are consistent.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5273) KafkaConsumer.committed() should get latest committed offsets from the server

2017-05-22 Thread Apurva Mehta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5273 started by Apurva Mehta.
---
> KafkaConsumer.committed() should get latest committed offsets from the server
> -
>
> Key: KAFKA-5273
> URL: https://issues.apache.org/jira/browse/KAFKA-5273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> Currently, the `KafkaConsumer.committed(topicPartition)` will return the 
> current position of the consumer for that partition if the consumer has been 
> assigned the partition. Otherwise, it will lookup the committed position from 
> the server. 
> With the new producer `sendOffsetsToTransaction` api, we get into a state 
> where we can commit the offsets for an assigned partition through the 
> producer. So the consumer doesn't update it's cached view and subsequently 
> returns a stale committed offset for it's assigned partition. 
> We should either update the consumer's cache when offsets are committed 
> through the producer, or drop the cache totally and always lookup the server 
> to get the committed offset. This way the `committed` method will always 
> return the latest committed offset for any partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5128) TransactionCoordinator - Check inter broker protocol and message format and raise errors if incompatible

2017-05-22 Thread Apurva Mehta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apurva Mehta updated KAFKA-5128:

Status: Patch Available  (was: Open)

> TransactionCoordinator - Check inter broker protocol and message format and 
> raise errors if incompatible
> 
>
> Key: KAFKA-5128
> URL: https://issues.apache.org/jira/browse/KAFKA-5128
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
>  Labels: exactly-once
>
> In {{TransactionStateManager}} and {{InterBrokerSendThread}} we need to check 
> if the message format supports transactions and error out if it doesn't



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5128) TransactionCoordinator - Check inter broker protocol and message format and raise errors if incompatible

2017-05-22 Thread Apurva Mehta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apurva Mehta updated KAFKA-5128:

Labels: exactly-once  (was: )

> TransactionCoordinator - Check inter broker protocol and message format and 
> raise errors if incompatible
> 
>
> Key: KAFKA-5128
> URL: https://issues.apache.org/jira/browse/KAFKA-5128
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
>  Labels: exactly-once
>
> In {{TransactionStateManager}} and {{InterBrokerSendThread}} we need to check 
> if the message format supports transactions and error out if it doesn't



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5280) Protect concurrent access to the cached transaction status

2017-05-22 Thread Apurva Mehta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apurva Mehta updated KAFKA-5280:

Status: Patch Available  (was: Open)

> Protect concurrent access to the cached transaction status
> --
>
> Key: KAFKA-5280
> URL: https://issues.apache.org/jira/browse/KAFKA-5280
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Apurva Mehta
>Assignee: Guozhang Wang
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5275) Review and potentially tweak AdminClient API for the initial release (KIP-117)

2017-05-22 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020039#comment-16020039
 ] 

Colin P. McCabe commented on KAFKA-5275:


bq. [~xvl] wrote: To echo Randall's point about having a topic config builder, 
I completely agree. However, we should also define constants for every topic 
configuration option that exists. Currently the only way to get those is to 
pull in the kafka-server jar and access things like LogConfig.SegmentMsProp() 
directly

That's a very good point.  It would be much more user-friendly to provide 
constants.  We need this anyway for things like {{alterConfigs}} and 
{{describeConfigs}}.

Maybe we could do this by putting all the constants into {{AdminClientConf}}.  
Since the server jars depend on the client jars, but not vice versa, we would 
then have to change {{KafkaConfig.scala}} so that it defined all these 
constants in terms of the {{AdminClientConf}} versions.  That way, everything 
would stay in sync.

bq. [~rhauch] wrote: Kafka Connect does log which topics were created and which 
were found. That'd be trivial if CreateTopicResults had methods to return the 
names of the newly-created and existing topics – perhaps something like: 
createdTopicNames, existingTopicNames

Hmm.  Methods like these, can't handle the case where there is an error other 
than {{TopicExistsException}}.  They have to throw an exception, and then you 
get no information about any topic other than the one that had the exception.  
Can't you just iterate through the map and do per-topic error handling?  It 
just seems like an easier and cleaner solution, unless there's something I'm 
missing.

> Review and potentially tweak AdminClient API for the initial release (KIP-117)
> --
>
> Key: KAFKA-5275
> URL: https://issues.apache.org/jira/browse/KAFKA-5275
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.0.0
>
>
> Once all the pieces are in, we should take a pass and ensure that the APIs 
> work well together and that they are consistent.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-0.11.0-jdk7 #13

2017-05-22 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-5277; Sticky Assignor should not cache previous assignment (KIP-54

--
[...truncated 882.08 KB...]
kafka.admin.TopicCommandTest > testCreateIfNotExists PASSED

kafka.admin.TopicCommandTest > testCreateAlterTopicWithRackAware STARTED

kafka.admin.TopicCommandTest > testCreateAlterTopicWithRackAware PASSED

kafka.admin.TopicCommandTest > testTopicDeletion STARTED

kafka.admin.TopicCommandTest > testTopicDeletion PASSED

kafka.admin.TopicCommandTest > testConfigPreservationAcrossPartitionAlteration 
STARTED

kafka.admin.TopicCommandTest > testConfigPreservationAcrossPartitionAlteration 
PASSED

kafka.admin.TopicCommandTest > testAlterIfExists STARTED

kafka.admin.TopicCommandTest > testAlterIfExists PASSED

kafka.admin.TopicCommandTest > testDeleteIfExists STARTED

kafka.admin.TopicCommandTest > testDeleteIfExists PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection STARTED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection PASSED

kafka.controller.ControllerIntegrationTest > testTopicCreation STARTED

kafka.controller.ControllerIntegrationTest > testTopicCreation PASSED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment STARTED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment PASSED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion STARTED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled PASSED

kafka.controller.ControllerIntegrationTest > testEmptyCluster STARTED

kafka.controller.ControllerIntegrationTest > testEmptyCluster PASSED

kafka.controller.ControllerIntegrationTest > testPreferredReplicaLeaderElection 
STARTED

kafka.controller.ControllerIntegrationTest > testPreferredReplicaLeaderElection 
PASSED

kafka.controller.ControllerFailoverTest > testMetadataUpdate STARTED

kafka.controller.ControllerFailoverTest > testMetadataUpdate SKIPPED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED


Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface

2017-05-22 Thread Andrew Coates
Thanks Mike, I was just about to do the same!

I think the benefit of the builder pattern, or the `with` pattern, is
greatly improved if you take interceptor implementations into account.

On Fri, 19 May 2017 at 08:29 Michael Pearce  wrote:

> And I just failed at a basic copy/paste exercise (bangs head on wall)
>
> Try again:
>
>
> > On 18 May 2017, at 18:29, Andrew Coates  wrote:
> >
> > Hi all,
> >
> > The `ProducerRecord` type has many optional fields and the list has
> grown over different revisions of Kafka. Kafka supports
> `ProducerInterceptor`s, which often need to construct new
> `ProducerRecord`s, based on those passed in, copying most fields from the
> old to the new record, e.g.:
> >
> > ```java
> >   public ProducerRecord onSend(ProducerRecord record) {
> >   return new ProducerRecord<>(record.topic(), record.partition(),
> getSpecificTimestampIWantToSet(), record.key(), record.value())
> >   }
> > ```
> >
> > If/when a new field is next added to the `ProducerRecord` all existing
> interceptor implementations will fail to copy across the new field,
> assuming a backwards compatible constructors exist that allow the old code
> to compile, (which the tend to do). This makes the code brittle and leaves
> me with a bad taste in my mouth.
> >
> > Additionally, the set of `ProducerRecord` constructors is multiplying as
> new optional fields are being added and not all combinations are supported,
> though they may be valid.
> >
> > I was wondering what peoples thoughts would be to introducing a builder
> pattern on the producer record?  If we did and a pre-initialised builder
> could be obtained from any existing record, then interceptors can just
> set/oeverwrite the fields they care about, without additional fields being
> lost, so the above code becomes:
> >
> > ```java
> >   public ProducerRecord onSend(ProducerRecord record) {
> >   return record.asBuilder()
> >.setTimestamp(getSpecificTimestampIWantToSet())
> >  .build();
> >   }
> > ```
> >
> > This has the benefits of less and more clear interceptor code, and the
> code will pass along new fields, added in a newer version, without
> modification. (Though interceptor authors can still make the choice to use
> a constructor instead, dropping new fields - but now they’d have a choice).
> >
> > If people like this idea then I can create a Jira and a PR. (Would a KIP
> be required also?). If people don’t, I’ll move along quietly…
> >
> > Thanks,
> >
> > Andy
> >
> >
>
> On 19/05/2017, 08:27, "Michael Pearce"  wrote:
>
> Just copying in from the mail list, another use case / +1 for the
> builder pattern, so its kept with the KIP thread.
>
>
>
> On 18/05/2017, 22:06, "Andrew Coates" 
> wrote:
>
> Thanks Mike
> On Thu, 18 May 2017 at 21:33, Michael André Pearce <
> michael.andre.pea...@me.com> wrote:
>
> >Hi Andrew,
> >
> >There is already a kip discussion exactly around this if you look
> for KIP
> >141 discuss thread.
> >
> >Cheers
> >Mike
> >
> >Sent from my iPhone
> >
> >> On 18 May 2017, at 18:29, Andrew Coates 
> wrote:
> >>
> >> Hi all,
> >>
> >> The `ProducerRecord` type has many optional fields and the list
> has
> >grown over different revisions of Kafka. Kafka supports
> >`ProducerInterceptor`s, which often need to construct new
> >`ProducerRecord`s, based on those passed in, copying most fields
> from the
> >old to the new record, e.g.:
> >>
> >> ```java
> >>   public ProducerRecord onSend(ProducerRecord record)
> {
> >>   return new ProducerRecord<>(record.topic(),
> record.partition(),
> >getSpecificTimestampIWantToSet(), record.key(), record.value())
> >>   }
> >> ```
> >>
> >> If/when a new field is next added to the `ProducerRecord` all
> existing
> >interceptor implementations will fail to copy across the new
> field,
> >assuming a backwards compatible constructors exist that allow the
> old code
> >to compile, (which the tend to do). This makes the code
>
>
> On 08/05/2017, 18:26, "Colin McCabe"  wrote:
>
> Hadoop had a very similar issue in two places: in the constructor
> of
> MiniDFSCluster, and with the FileSystem#create API.  In both cases,
> people kept adding more and more function overloads until the APIs
> got
> very ugly and hard to understand.  This is especially the case
> when some
> of the parameters were just ints or other basic types.  Do you
> need the
> Path, FsPermission, boolean, int, short, long, Progressable,
> InetSocketAddress[] overload or the Path, FsPermission,
> 

Re: [VOTE] KIP-117: Add a public AdminClient API for Kafka admin operations

2017-05-22 Thread Colin McCabe
Oops, I just realized if we do a call with topics=*, we don't need to
make a follow-up call.

:)

The question still holds, though: is it worth sacrificing some
scalability when talking to older brokers, to get saner semantics?

cheers,
Colin


On Mon, May 22, 2017, at 11:41, Colin McCabe wrote:
> I definitely agree that auto topic creation is unexpected and confusing
> (even with the JavaDoc note in the API).  The proposed solution of
> adding a flag to MetadataRequest seems pretty simple and reasonable. 
> +1.
> 
> As you noted, though, we don't have a way to do this for the 0.10.x
> releases.  It seems a bit harsh to have such different behavior there. 
> Is there a way that we can fudge this a bit so that it mostly works? 
> For example, when communicating with 0.10.x brokers, describeTopics
> could do a MetadataRequest(topics=*) to filter out non-existent topics.
> 
> This would obviously have a bit of a time-of-check, time-of-use race
> condition since we're making two calls.  And also a scalability problem
> since we're using topics=*.  Is it worth it to make the behavior saner
> on older brokers?  Or should we add a JavaDoc note and move on?
> 
> best,
> Colin
> 
> 
> On Fri, May 19, 2017, at 05:46, Ismael Juma wrote:
> > Hi all,
> > 
> > Feedback from people who tried the AdminClient is that auto topic
> > creation
> > during describe is unexpected and confusing. This is consistent with the
> > reaction of most people when they learn that MetadataRequest can cause
> > topics to be created. We had assumed that we'd tackle this issue for all
> > the clients as part of deprecation of server-side auto topic creation in
> > favour of client-side auto-topic creation.
> > 
> > However, it would be better to do the right thing for the AdminClient
> > from
> > the start. Users will be less confused and we won't have to deal with
> > compatibility concerns. Jason suggested a simple solution: make it
> > possible
> > to disallow auto topic creation when sending the metadata request. The
> > AdminClient would take advantage of this now (i.e. 0.11.0.0) while the
> > producer and consumer would retain the existing behaviour. In a
> > subsequent
> > release, we'll work out the details of how to move away from server-side
> > auto topic creation for the producer and consumer (taking into account
> > the
> > compatibility impact).
> > 
> > Because of the protocol change, this solution would only help in cases
> > where the AdminClient is describing topics from a 0.11.0.0 or newer
> > broker.
> > 
> > I submitted a PR for this and it's small and straightforward:
> > 
> > https://github.com/apache/kafka/pull/3098
> > 
> > Thoughts?
> > 
> > Ismael
> > 
> > On Sat, Mar 25, 2017 at 1:25 AM, Colin McCabe  wrote:
> > 
> > > With binding +1 votes from Gwen Shapira, Sriram Subramanian, and Grant
> > > Henke, and a non-binding vote from Dong Lin, the vote passes.  There
> > > were no +0 or -1 votes.  As mentioned earlier, the interface will be
> > > unstable at first and we will continue to evolve it.
> > >
> > > thanks,
> > > Colin McCabe
> > >
> > >
> > > On Wed, Mar 22, 2017, at 10:21, Colin McCabe wrote:
> > > > On Fri, Mar 17, 2017, at 10:50, Jun Rao wrote:
> > > > > Hi, Colin,
> > > > >
> > > > > Thanks for the KIP. Looks good overall. A few comments below.
> > > > >
> > > > > 1. Sometimes we return
> > > > > CompletableFuture>
> > > > > and some other times we return
> > > > > Map
> > > > > , which doesn't seem consistent. Is that intentional?
> > > >
> > > > Yes, this is intentional.  We got feedback from some people that they
> > > > wanted a single future that would fail if anything failed.  Other people
> > > > wanted to be able to detect failures on individual elements of a batch.
> > > > This API lets us have both (you just choose which future you want to
> > > > wait on).
> > > >
> > > > >
> > > > > 2. We support batching in CreateTopic/DeleteTopic/ListTopic, but not
> > > in
> > > > > DescribeTopic. Should we add batching in DescribeTopic to make it
> > > > > consistent?
> > > >
> > > > Good idea.  Let's add batching to DescribeTopic(s).
> > > >
> > > > > Also, both ListTopic and DescribeTopic seem to return
> > > > > TopicDescription. Could we just consolidate the two by just keeping
> > > > > DescribeTopic?
> > > >
> > > > Sorry, that was a typo.  ListTopics is supposed to return TopicListing,
> > > > which tells you only the name of the topic and whether it is internal.
> > > > The idea is that later we will add another RPC which allows us to fetch
> > > > just this information, and not the other topic fields. That way, we can
> > > > be more efficient.  The idea is that ListTopics is like readdir()/ls and
> > > > DescribeTopics is like stat().  Getting detailed information about
> > > > 1,000s of topics could be quite a resource hog for cluster management
> > > > systems in a big cluster.
> > > >
> > > > >
> > > > > 3. 

Re: [VOTE] KIP-117: Add a public AdminClient API for Kafka admin operations

2017-05-22 Thread Colin McCabe
I definitely agree that auto topic creation is unexpected and confusing
(even with the JavaDoc note in the API).  The proposed solution of
adding a flag to MetadataRequest seems pretty simple and reasonable. 
+1.

As you noted, though, we don't have a way to do this for the 0.10.x
releases.  It seems a bit harsh to have such different behavior there. 
Is there a way that we can fudge this a bit so that it mostly works? 
For example, when communicating with 0.10.x brokers, describeTopics
could do a MetadataRequest(topics=*) to filter out non-existent topics.

This would obviously have a bit of a time-of-check, time-of-use race
condition since we're making two calls.  And also a scalability problem
since we're using topics=*.  Is it worth it to make the behavior saner
on older brokers?  Or should we add a JavaDoc note and move on?

best,
Colin


On Fri, May 19, 2017, at 05:46, Ismael Juma wrote:
> Hi all,
> 
> Feedback from people who tried the AdminClient is that auto topic
> creation
> during describe is unexpected and confusing. This is consistent with the
> reaction of most people when they learn that MetadataRequest can cause
> topics to be created. We had assumed that we'd tackle this issue for all
> the clients as part of deprecation of server-side auto topic creation in
> favour of client-side auto-topic creation.
> 
> However, it would be better to do the right thing for the AdminClient
> from
> the start. Users will be less confused and we won't have to deal with
> compatibility concerns. Jason suggested a simple solution: make it
> possible
> to disallow auto topic creation when sending the metadata request. The
> AdminClient would take advantage of this now (i.e. 0.11.0.0) while the
> producer and consumer would retain the existing behaviour. In a
> subsequent
> release, we'll work out the details of how to move away from server-side
> auto topic creation for the producer and consumer (taking into account
> the
> compatibility impact).
> 
> Because of the protocol change, this solution would only help in cases
> where the AdminClient is describing topics from a 0.11.0.0 or newer
> broker.
> 
> I submitted a PR for this and it's small and straightforward:
> 
> https://github.com/apache/kafka/pull/3098
> 
> Thoughts?
> 
> Ismael
> 
> On Sat, Mar 25, 2017 at 1:25 AM, Colin McCabe  wrote:
> 
> > With binding +1 votes from Gwen Shapira, Sriram Subramanian, and Grant
> > Henke, and a non-binding vote from Dong Lin, the vote passes.  There
> > were no +0 or -1 votes.  As mentioned earlier, the interface will be
> > unstable at first and we will continue to evolve it.
> >
> > thanks,
> > Colin McCabe
> >
> >
> > On Wed, Mar 22, 2017, at 10:21, Colin McCabe wrote:
> > > On Fri, Mar 17, 2017, at 10:50, Jun Rao wrote:
> > > > Hi, Colin,
> > > >
> > > > Thanks for the KIP. Looks good overall. A few comments below.
> > > >
> > > > 1. Sometimes we return
> > > > CompletableFuture>
> > > > and some other times we return
> > > > Map
> > > > , which doesn't seem consistent. Is that intentional?
> > >
> > > Yes, this is intentional.  We got feedback from some people that they
> > > wanted a single future that would fail if anything failed.  Other people
> > > wanted to be able to detect failures on individual elements of a batch.
> > > This API lets us have both (you just choose which future you want to
> > > wait on).
> > >
> > > >
> > > > 2. We support batching in CreateTopic/DeleteTopic/ListTopic, but not
> > in
> > > > DescribeTopic. Should we add batching in DescribeTopic to make it
> > > > consistent?
> > >
> > > Good idea.  Let's add batching to DescribeTopic(s).
> > >
> > > > Also, both ListTopic and DescribeTopic seem to return
> > > > TopicDescription. Could we just consolidate the two by just keeping
> > > > DescribeTopic?
> > >
> > > Sorry, that was a typo.  ListTopics is supposed to return TopicListing,
> > > which tells you only the name of the topic and whether it is internal.
> > > The idea is that later we will add another RPC which allows us to fetch
> > > just this information, and not the other topic fields. That way, we can
> > > be more efficient.  The idea is that ListTopics is like readdir()/ls and
> > > DescribeTopics is like stat().  Getting detailed information about
> > > 1,000s of topics could be quite a resource hog for cluster management
> > > systems in a big cluster.
> > >
> > > >
> > > > 3. listNodes: At the request protocol level, we can get things like
> > > > clusterId and controller broker id. Both are useful info from an admin
> > > > perspective, but are not exposed through the api. Perhaps we can
> > > > generalize
> > > > listNodes to sth like describeCluster so that we can return those
> > > > additional info as well?
> > >
> > > Yeah, let's change listNodes -> describeCluster.
> > >
> > > >
> > > > 4. Configurations: To support security, we will need to include all
> > > > properties 

[jira] [Resolved] (KAFKA-5277) Sticky Assignor should not cache the calculated assignment (KIP-54 follow-up)

2017-05-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-5277.

   Resolution: Fixed
Fix Version/s: 0.11.0.0

Issue resolved by pull request 3092
[https://github.com/apache/kafka/pull/3092]

> Sticky Assignor should not cache the calculated assignment (KIP-54 follow-up)
> -
>
> Key: KAFKA-5277
> URL: https://issues.apache.org/jira/browse/KAFKA-5277
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> As a follow-up to KIP-54, remove the dependency of Sticky Assignor to 
> previously calculated assignment. This dependency is not required because 
> each consumer participating in the rebalance now notifies the group leader of 
> their assignment prior to rebalance. So the leader can compile the previous 
> assignment of the whole group from this information. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5277) Sticky Assignor should not cache the calculated assignment (KIP-54 follow-up)

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019926#comment-16019926
 ] 

ASF GitHub Bot commented on KAFKA-5277:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3092


> Sticky Assignor should not cache the calculated assignment (KIP-54 follow-up)
> -
>
> Key: KAFKA-5277
> URL: https://issues.apache.org/jira/browse/KAFKA-5277
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As a follow-up to KIP-54, remove the dependency of Sticky Assignor to 
> previously calculated assignment. This dependency is not required because 
> each consumer participating in the rebalance now notifies the group leader of 
> their assignment prior to rebalance. So the leader can compile the previous 
> assignment of the whole group from this information. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3092: KAFKA-5277: Sticky Assignor should not cache previ...

2017-05-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3092


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Work started] (KAFKA-4935) Consider disabling record level CRC checks for message format V2

2017-05-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4935 started by Jason Gustafson.
--
> Consider disabling record level CRC checks for message format V2
> 
>
> Key: KAFKA-4935
> URL: https://issues.apache.org/jira/browse/KAFKA-4935
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apurva Mehta
>Assignee: Jason Gustafson
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> With the new message format proposed in KIP-98, the record level CRC has been 
> moved to the the batch header.
> Because we expose the record-level CRC in `RecordMetadata` and 
> `ConsumerRecord`, we currently compute it eagerly based on the key, value and 
> timestamp even though these methods are rarely used. Ideally, we'd deprecate 
> the relevant methods in `RecordMetadata` and `ConsumerRecord` while making 
> the CRC computation lazy. This seems pretty hard to achieve in the Producer 
> without increasing memory retention, but it may be possible to do in the 
> Consumer.
> An alternative option is to return the batch CRC from the relevant methods.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5191) Autogenerate Consumer Fetcher metrics

2017-05-22 Thread James Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng updated KAFKA-5191:
---
Reviewer: Jason Gustafson  (was: Ismael Juma)

Changing reviewer to [~hachikuji], since he was the original reporter of the 
parent JIRA.

I'm hoping to get this into 0.11.

> Autogenerate Consumer Fetcher metrics
> -
>
> Key: KAFKA-5191
> URL: https://issues.apache.org/jira/browse/KAFKA-5191
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
> Attachments: docs_now_include_partition_level_metrics.png, 
> generated_fetcher_docs.png, generated_fetcher_docs_with_alternate_css.png, 
> generated_fetcher_docs_with_css.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-22 Thread Kyle Ambroff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff updated KAFKA-5297:

Attachment: flame graph of broker during shut down.png

> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: flame graph of broker during shut down.png, 
> LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to 
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a 
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, 
> it appears to do quite a bit of extra work to avoid keeping an instance of 
> RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
> potentially perform an additional disk seek, all before imediately closing 
> the file.
> You wouldn't think this would amount to much, but I put together a benchmark 
> using jmh to measure the difference between the current code and a new 
> implementation that didn't have to recreate the page mapping during resize(), 
> and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
>   2063.386 ±(99.9%) 81.758 ops/s [Average]
>   (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
>   CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
> Result "optimizedImplementation":
> 

Re: Re: [DISCUSS] KIP-148: Add a connect timeout for client

2017-05-22 Thread Simon Souter
The following tickets are probably relevant to this KIP:

https://issues.apache.org/jira/browse/KAFKA-3457
https://issues.apache.org/jira/browse/KAFKA-1894
https://issues.apache.org/jira/browse/KAFKA-3834

On 22 May 2017 at 16:30, Rajini Sivaram  wrote:

> Ismael,
>
> Yes, agree. My concern was that a connection can be shutdown uncleanly at
> any time. If a client is in the middle of a request, then it times out
> after min(request.timeout.ms, tcp-timeout). If we add another config
> option
> connect.timeout.ms, then we will sometimes wait for min(connect.timeout.ms
> ,
> tcp-timeout) and sometimes for min(request.timeout.ms, tcp-timeout),
> depending
> on connection state. One config option feels neater to me.
>
> On Mon, May 22, 2017 at 11:21 AM, Ismael Juma  wrote:
>
> > Rajini,
> >
> > For this to have the desired effect, we'd probably need to lower the
> > default request.timeout.ms for the consumer and fix the underlying
> reason
> > why it is a little over 5 minutes at the moment.
> >
> > Ismael
> >
> > On Mon, May 22, 2017 at 4:15 PM, Rajini Sivaram  >
> > wrote:
> >
> > > Hi David,
> > >
> > > Sorry, what I meant was: Can you reuse the existing configuration
> option
> > > request.timeout,ms , instead of adding a new config and add the
> behaviour
> > > that you have proposed in the KIP for the connection phase using this
> > > timeout? I think the timeout for connection is useful. I am not sure we
> > > need another configuration option to implement it.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > > On Mon, May 22, 2017 at 11:06 AM, 东方甲乙 <254479...@qq.com> wrote:
> > >
> > > > Hi Rajini.
> > > >
> > > > When kafka node' machine is shutdown or network is closed, the
> > connecting
> > > > phase could not use the request.timeout.ms, because the client
> haven't
> > > > send a req yet.   And no response for the nio, the selector will not
> > > close
> > > > the connect, so it will not choose other good node to get the
> metadata.
> > > >
> > > >
> > > > Thanks
> > > > David
> > > >
> > > > -- 原始邮件 --
> > > > *发件人:* "Rajini Sivaram" ;
> > > > *发送时间:* 2017年5月22日(星期一) 20:17
> > > > *收件人:* "dev" ;
> > > > *主题:* Re: [DISCUSS] KIP-148: Add a connect timeout for client
> > > >
> > > >
> > > > Hi David,
> > > >
> > > > Is there a reason why you wouldn't want to use request.timeout.ms as
> > the
> > > > timeout parameter for connections? Then you would use the same
> timeout
> > > for
> > > > connected and connecting phases when shutdown is unclean. You could
> > still
> > > > use the timeout to ensure that next metadata request is sent to
> another
> > > > node.
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > > > On Sun, May 21, 2017 at 9:51 AM, 东方甲乙 <254479...@qq.com> wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > >
> > > > > Thanks for the clarify. For the clarify 2, I think the key thing is
> > not
> > > > > users control how much time in maximum to wait for inside code, but
> > is
> > > > the
> > > > > network client can be aware of the connecting can't be finished and
> > > try a
> > > > > good node. In the producer.sender even the selector.poll can
> timeout,
> > > but
> > > > > the next time is also not close the previous connecting and try
> > another
> > > > > good node.
> > > > >
> > > > >
> > > > > In out test env, QA shutdown one of the leader node, the producer
> > send
> > > > the
> > > > > request will timeout and close the node's connection then request
> the
> > > > > metadata.  But sometimes the request node is also the shutdown
> node.
> > > > When
> > > > > connecting the shutting down node to get the metadata, it is in the
> > > > > connecting phase, network client mark the connecting node's state
> to
> > > > > CONNECTING, but if the node is shutdown,  the socket can't be aware
> > of
> > > > the
> > > > > connecting is broken. Though the selector.poll has timeout
> parameter,
> > > but
> > > > > it will not close the connection, so the next
> > > > > time in the "networkclient.maybeUpdate" it will check if
> > > > > isAnyNodeConnecting, then will not connect to any good node the get
> > the
> > > > > metadata.  It need about several minutes to
> > > > > aware the connecting is timeout and try other node.
> > > > >
> > > > >
> > > > > So I want to add a connect.timeout parameter,  the selector can
> find
> > > the
> > > > > connecting is timeout and close the connection.  It seems the
> > currently
> > > > the
> > > > > timeout value passed in `selector.poll()`
> > > > > seems can not do this.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > David
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > -- 原始邮件 --
> > > > > 发件人: "Guozhang Wang";;
> > > > > 发送时间: 2017年5月16日(星期二) 凌晨1:51
> > > > > 收件人: "dev@kafka.apache.org";

[jira] [Commented] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-22 Thread Kyle Ambroff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019848#comment-16019848
 ] 

Kyle Ambroff commented on KAFKA-5297:
-

I'm still unsure about this approach, but I wanted to post it early to see if 
anyone had any thoughts. I'm working through some of the test cases that have 
failed to see whether it will work at all.

> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to 
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a 
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, 
> it appears to do quite a bit of extra work to avoid keeping an instance of 
> RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
> potentially perform an additional disk seek, all before imediately closing 
> the file.
> You wouldn't think this would amount to much, but I put together a benchmark 
> using jmh to measure the difference between the current code and a new 
> implementation that didn't have to recreate the page mapping during resize(), 
> and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
>   2063.386 ±(99.9%) 81.758 ops/s [Average]
>   (min, avg, max) = (1685.574, 

[jira] [Updated] (KAFKA-5218) New Short serializer, deserializer, serde

2017-05-22 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5218:
---
Component/s: streams

> New Short serializer, deserializer, serde
> -
>
> Key: KAFKA-5218
> URL: https://issues.apache.org/jira/browse/KAFKA-5218
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Mario Molina
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> There is no Short serializer/deserializer in the current clients component.
> It could be useful when using Kafka-Connect to write data to databases with 
> SMALLINT fields (or similar) and avoiding conversions to int improving a bit 
> the performance in terms of memory and network.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5218) New Short serializer, deserializer, serde

2017-05-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019846#comment-16019846
 ] 

Matthias J. Sax commented on KAFKA-5218:


[~guozhang] Can you add [~mmolimar] to the contributor list and assign this 
jira to him? Thx.

> New Short serializer, deserializer, serde
> -
>
> Key: KAFKA-5218
> URL: https://issues.apache.org/jira/browse/KAFKA-5218
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Mario Molina
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> There is no Short serializer/deserializer in the current clients component.
> It could be useful when using Kafka-Connect to write data to databases with 
> SMALLINT fields (or similar) and avoiding conversions to int improving a bit 
> the performance in terms of memory and network.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5218) New Short serializer, deserializer, serde

2017-05-22 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5218:
---
Status: Patch Available  (was: Open)

> New Short serializer, deserializer, serde
> -
>
> Key: KAFKA-5218
> URL: https://issues.apache.org/jira/browse/KAFKA-5218
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.2.0, 0.10.1.1
>Reporter: Mario Molina
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> There is no Short serializer/deserializer in the current clients component.
> It could be useful when using Kafka-Connect to write data to databases with 
> SMALLINT fields (or similar) and avoiding conversions to int improving a bit 
> the performance in terms of memory and network.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5081) two versions of jackson-annotations-xxx.jar in distribution tgz

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019822#comment-16019822
 ] 

ASF GitHub Bot commented on KAFKA-5081:
---

GitHub user dejan2609 opened a pull request:

https://github.com/apache/kafka/pull/3116

KAFKA-5081: force version for 'jackson-annotations' in order to prevent 
redundant jars from being bundled into kafka distribution (and to align with 
other jackson libraries)

**JIRA ticket:** [KAFKA-5081 two versions of jackson-annotations-xxx.jar in 
distribution tgz](https://issues.apache.org/jira/browse/KAFKA-5081)

**Solutions:**
1. accept this merge request **_OR_**
2. upgrade jackson libraries to version **_2.9.x_** (currently available as 
a pre-release only)

**Related jackson issue:** [Add explicit \`jackson-annotations\` dependency 
version for 
\`jackson-databind\`](https://github.com/FasterXML/jackson-databind/issues/1545)

**Note:** previous (equivalent) merge request #2900 ended up deep in the 
sand with swarm of messages due to flaky test, so I opted to close it and to 
open this one.

@ijuma: FYI

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dejan2609/kafka KAFKA-5081

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3116


commit ab682e756af29cf84dcc01e63ddcb9c150786bec
Author: dejan2609 
Date:   2017-04-24T12:35:56Z

KAFKA-5081: force version for 'jackson-annotations' in order to prevent 
redundant jars from being bundled into kafka distribution (and to align with 
other jackson libraries)




> two versions of jackson-annotations-xxx.jar in distribution tgz
> ---
>
> Key: KAFKA-5081
> URL: https://issues.apache.org/jira/browse/KAFKA-5081
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Reporter: Edoardo Comar
>Priority: Minor
>
> git clone https://github.com/apache/kafka.git
> cd kafka
> gradle
> ./gradlew releaseTarGz
> then kafka/core/build/distributions/kafka-...-SNAPSHOT.tgz contains
> in the libs directory two versions of this jar
> jackson-annotations-2.8.0.jar
> jackson-annotations-2.8.5.jar



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3116: KAFKA-5081: force version for 'jackson-annotations...

2017-05-22 Thread dejan2609
GitHub user dejan2609 opened a pull request:

https://github.com/apache/kafka/pull/3116

KAFKA-5081: force version for 'jackson-annotations' in order to prevent 
redundant jars from being bundled into kafka distribution (and to align with 
other jackson libraries)

**JIRA ticket:** [KAFKA-5081 two versions of jackson-annotations-xxx.jar in 
distribution tgz](https://issues.apache.org/jira/browse/KAFKA-5081)

**Solutions:**
1. accept this merge request **_OR_**
2. upgrade jackson libraries to version **_2.9.x_** (currently available as 
a pre-release only)

**Related jackson issue:** [Add explicit \`jackson-annotations\` dependency 
version for 
\`jackson-databind\`](https://github.com/FasterXML/jackson-databind/issues/1545)

**Note:** previous (equivalent) merge request #2900 ended up deep in the 
sand with swarm of messages due to flaky test, so I opted to close it and to 
open this one.

@ijuma: FYI

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dejan2609/kafka KAFKA-5081

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3116


commit ab682e756af29cf84dcc01e63ddcb9c150786bec
Author: dejan2609 
Date:   2017-04-24T12:35:56Z

KAFKA-5081: force version for 'jackson-annotations' in order to prevent 
redundant jars from being bundled into kafka distribution (and to align with 
other jackson libraries)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5081) two versions of jackson-annotations-xxx.jar in distribution tgz

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019814#comment-16019814
 ] 

ASF GitHub Bot commented on KAFKA-5081:
---

Github user dejan2609 closed the pull request at:

https://github.com/apache/kafka/pull/2900


> two versions of jackson-annotations-xxx.jar in distribution tgz
> ---
>
> Key: KAFKA-5081
> URL: https://issues.apache.org/jira/browse/KAFKA-5081
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Reporter: Edoardo Comar
>Priority: Minor
>
> git clone https://github.com/apache/kafka.git
> cd kafka
> gradle
> ./gradlew releaseTarGz
> then kafka/core/build/distributions/kafka-...-SNAPSHOT.tgz contains
> in the libs directory two versions of this jar
> jackson-annotations-2.8.0.jar
> jackson-annotations-2.8.5.jar



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2900: KAFKA-5081: force version for 'jackson-annotations...

2017-05-22 Thread dejan2609
Github user dejan2609 closed the pull request at:

https://github.com/apache/kafka/pull/2900


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-0.11.0-jdk7 #12

2017-05-22 Thread Apache Jenkins Server
See 


Changes:

[rajinisivaram] KAFKA-5303, KAFKA-5305: Improve logging when fetches fail in

--
[...truncated 881.82 KB...]
kafka.admin.TopicCommandTest > testCreateIfNotExists PASSED

kafka.admin.TopicCommandTest > testCreateAlterTopicWithRackAware STARTED

kafka.admin.TopicCommandTest > testCreateAlterTopicWithRackAware PASSED

kafka.admin.TopicCommandTest > testTopicDeletion STARTED

kafka.admin.TopicCommandTest > testTopicDeletion PASSED

kafka.admin.TopicCommandTest > testConfigPreservationAcrossPartitionAlteration 
STARTED

kafka.admin.TopicCommandTest > testConfigPreservationAcrossPartitionAlteration 
PASSED

kafka.admin.TopicCommandTest > testAlterIfExists STARTED

kafka.admin.TopicCommandTest > testAlterIfExists PASSED

kafka.admin.TopicCommandTest > testDeleteIfExists STARTED

kafka.admin.TopicCommandTest > testDeleteIfExists PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection STARTED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection PASSED

kafka.controller.ControllerIntegrationTest > testTopicCreation STARTED

kafka.controller.ControllerIntegrationTest > testTopicCreation PASSED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment STARTED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment PASSED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion STARTED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled PASSED

kafka.controller.ControllerIntegrationTest > testEmptyCluster STARTED

kafka.controller.ControllerIntegrationTest > testEmptyCluster PASSED

kafka.controller.ControllerIntegrationTest > testPreferredReplicaLeaderElection 
STARTED

kafka.controller.ControllerIntegrationTest > testPreferredReplicaLeaderElection 
PASSED

kafka.controller.ControllerFailoverTest > testMetadataUpdate STARTED

kafka.controller.ControllerFailoverTest > testMetadataUpdate SKIPPED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > 

Build failed in Jenkins: kafka-trunk-jdk8 #1571

2017-05-22 Thread Apache Jenkins Server
See 


Changes:

[rajinisivaram] KAFKA-5303, KAFKA-5305: Improve logging when fetches fail in

--
[...truncated 1.39 MB...]
org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryWithoutPasswordConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryWithoutPasswordConfiguration PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration PASSED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse STARTED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameOverride STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameOverride PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingOptionValue 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingOptionValue PASSED

org.apache.kafka.common.security.JaasContextTest > testSingleOption STARTED

org.apache.kafka.common.security.JaasContextTest > testSingleOption PASSED

org.apache.kafka.common.security.JaasContextTest > 
testNumericOptionWithoutQuotes STARTED

org.apache.kafka.common.security.JaasContextTest > 
testNumericOptionWithoutQuotes PASSED

org.apache.kafka.common.security.JaasContextTest > testConfigNoOptions STARTED

org.apache.kafka.common.security.JaasContextTest > testConfigNoOptions PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithWrongListenerName STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithWrongListenerName PASSED

org.apache.kafka.common.security.JaasContextTest > testNumericOptionWithQuotes 
STARTED

org.apache.kafka.common.security.JaasContextTest > testNumericOptionWithQuotes 
PASSED

org.apache.kafka.common.security.JaasContextTest > testQuotedOptionValue STARTED

org.apache.kafka.common.security.JaasContextTest > testQuotedOptionValue PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingLoginModule 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingLoginModule PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingSemicolon STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingSemicolon PASSED

org.apache.kafka.common.security.JaasContextTest > testMultipleOptions STARTED

org.apache.kafka.common.security.JaasContextTest > testMultipleOptions PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForClientWithListenerName STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForClientWithListenerName PASSED

org.apache.kafka.common.security.JaasContextTest > testMultipleLoginModules 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMultipleLoginModules 
PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingControlFlag 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingControlFlag PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameAndFallback STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameAndFallback 

Re: Re: [DISCUSS] KIP-148: Add a connect timeout for client

2017-05-22 Thread Guozhang Wang
Could you refresh me what's the issue of consumer that caused us to set the
default values of connect.timeout to 5 min?


Guozhang

On Mon, May 22, 2017 at 8:35 AM, Ismael Juma  wrote:

> Yes, I agree that one config would be better. Just a bit more work to
> achieve the desired effect for the Consumer.
>
> Ismael
>
> On Mon, May 22, 2017 at 4:30 PM, Rajini Sivaram 
> wrote:
>
> > Ismael,
> >
> > Yes, agree. My concern was that a connection can be shutdown uncleanly at
> > any time. If a client is in the middle of a request, then it times out
> > after min(request.timeout.ms, tcp-timeout). If we add another config
> > option
> > connect.timeout.ms, then we will sometimes wait for min(
> connect.timeout.ms
> > ,
> > tcp-timeout) and sometimes for min(request.timeout.ms, tcp-timeout),
> > depending
> > on connection state. One config option feels neater to me.
> >
> > On Mon, May 22, 2017 at 11:21 AM, Ismael Juma  wrote:
> >
> > > Rajini,
> > >
> > > For this to have the desired effect, we'd probably need to lower the
> > > default request.timeout.ms for the consumer and fix the underlying
> > reason
> > > why it is a little over 5 minutes at the moment.
> > >
> > > Ismael
> > >
> > > On Mon, May 22, 2017 at 4:15 PM, Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi David,
> > > >
> > > > Sorry, what I meant was: Can you reuse the existing configuration
> > option
> > > > request.timeout,ms , instead of adding a new config and add the
> > behaviour
> > > > that you have proposed in the KIP for the connection phase using this
> > > > timeout? I think the timeout for connection is useful. I am not sure
> we
> > > > need another configuration option to implement it.
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > > >
> > > > On Mon, May 22, 2017 at 11:06 AM, 东方甲乙 <254479...@qq.com> wrote:
> > > >
> > > > > Hi Rajini.
> > > > >
> > > > > When kafka node' machine is shutdown or network is closed, the
> > > connecting
> > > > > phase could not use the request.timeout.ms, because the client
> > haven't
> > > > > send a req yet.   And no response for the nio, the selector will
> not
> > > > close
> > > > > the connect, so it will not choose other good node to get the
> > metadata.
> > > > >
> > > > >
> > > > > Thanks
> > > > > David
> > > > >
> > > > > -- 原始邮件 --
> > > > > *发件人:* "Rajini Sivaram" ;
> > > > > *发送时间:* 2017年5月22日(星期一) 20:17
> > > > > *收件人:* "dev" ;
> > > > > *主题:* Re: [DISCUSS] KIP-148: Add a connect timeout for client
> > > > >
> > > > >
> > > > > Hi David,
> > > > >
> > > > > Is there a reason why you wouldn't want to use request.timeout.ms
> as
> > > the
> > > > > timeout parameter for connections? Then you would use the same
> > timeout
> > > > for
> > > > > connected and connecting phases when shutdown is unclean. You could
> > > still
> > > > > use the timeout to ensure that next metadata request is sent to
> > another
> > > > > node.
> > > > >
> > > > > Regards,
> > > > >
> > > > > Rajini
> > > > >
> > > > > On Sun, May 21, 2017 at 9:51 AM, 东方甲乙 <254479...@qq.com> wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > >
> > > > > > Thanks for the clarify. For the clarify 2, I think the key thing
> is
> > > not
> > > > > > users control how much time in maximum to wait for inside code,
> but
> > > is
> > > > > the
> > > > > > network client can be aware of the connecting can't be finished
> and
> > > > try a
> > > > > > good node. In the producer.sender even the selector.poll can
> > timeout,
> > > > but
> > > > > > the next time is also not close the previous connecting and try
> > > another
> > > > > > good node.
> > > > > >
> > > > > >
> > > > > > In out test env, QA shutdown one of the leader node, the producer
> > > send
> > > > > the
> > > > > > request will timeout and close the node's connection then request
> > the
> > > > > > metadata.  But sometimes the request node is also the shutdown
> > node.
> > > > > When
> > > > > > connecting the shutting down node to get the metadata, it is in
> the
> > > > > > connecting phase, network client mark the connecting node's state
> > to
> > > > > > CONNECTING, but if the node is shutdown,  the socket can't be
> aware
> > > of
> > > > > the
> > > > > > connecting is broken. Though the selector.poll has timeout
> > parameter,
> > > > but
> > > > > > it will not close the connection, so the next
> > > > > > time in the "networkclient.maybeUpdate" it will check if
> > > > > > isAnyNodeConnecting, then will not connect to any good node the
> get
> > > the
> > > > > > metadata.  It need about several minutes to
> > > > > > aware the connecting is timeout and try other node.
> > > > > >
> > > > > >
> > > > > > So I want to add a connect.timeout parameter,  the selector can
> > find
> > > > the
> > > > > > connecting is timeout and close the connection.  It seems 

[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-05-22 Thread Brandon Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019726#comment-16019726
 ] 

Brandon Bradley commented on KAFKA-1955:


Ok, I am wrong. There are two instances where the current implementation reuses 
buffers.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> the buffer isn't really fault-tolerant, since the ordering in the file isn't 
> known so there is no easy way to recovery the producer's buffer in a failure. 
> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact 
> on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 
> 30 seconds we should be able to keep taking writes) and observe how well the 
> producer handles the catch up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like 
> use.file.buffers=true/false.
> e. Add documentation that covers these new options.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Re: [DISCUSS] KIP-148: Add a connect timeout for client

2017-05-22 Thread Ismael Juma
Yes, I agree that one config would be better. Just a bit more work to
achieve the desired effect for the Consumer.

Ismael

On Mon, May 22, 2017 at 4:30 PM, Rajini Sivaram 
wrote:

> Ismael,
>
> Yes, agree. My concern was that a connection can be shutdown uncleanly at
> any time. If a client is in the middle of a request, then it times out
> after min(request.timeout.ms, tcp-timeout). If we add another config
> option
> connect.timeout.ms, then we will sometimes wait for min(connect.timeout.ms
> ,
> tcp-timeout) and sometimes for min(request.timeout.ms, tcp-timeout),
> depending
> on connection state. One config option feels neater to me.
>
> On Mon, May 22, 2017 at 11:21 AM, Ismael Juma  wrote:
>
> > Rajini,
> >
> > For this to have the desired effect, we'd probably need to lower the
> > default request.timeout.ms for the consumer and fix the underlying
> reason
> > why it is a little over 5 minutes at the moment.
> >
> > Ismael
> >
> > On Mon, May 22, 2017 at 4:15 PM, Rajini Sivaram  >
> > wrote:
> >
> > > Hi David,
> > >
> > > Sorry, what I meant was: Can you reuse the existing configuration
> option
> > > request.timeout,ms , instead of adding a new config and add the
> behaviour
> > > that you have proposed in the KIP for the connection phase using this
> > > timeout? I think the timeout for connection is useful. I am not sure we
> > > need another configuration option to implement it.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > > On Mon, May 22, 2017 at 11:06 AM, 东方甲乙 <254479...@qq.com> wrote:
> > >
> > > > Hi Rajini.
> > > >
> > > > When kafka node' machine is shutdown or network is closed, the
> > connecting
> > > > phase could not use the request.timeout.ms, because the client
> haven't
> > > > send a req yet.   And no response for the nio, the selector will not
> > > close
> > > > the connect, so it will not choose other good node to get the
> metadata.
> > > >
> > > >
> > > > Thanks
> > > > David
> > > >
> > > > -- 原始邮件 --
> > > > *发件人:* "Rajini Sivaram" ;
> > > > *发送时间:* 2017年5月22日(星期一) 20:17
> > > > *收件人:* "dev" ;
> > > > *主题:* Re: [DISCUSS] KIP-148: Add a connect timeout for client
> > > >
> > > >
> > > > Hi David,
> > > >
> > > > Is there a reason why you wouldn't want to use request.timeout.ms as
> > the
> > > > timeout parameter for connections? Then you would use the same
> timeout
> > > for
> > > > connected and connecting phases when shutdown is unclean. You could
> > still
> > > > use the timeout to ensure that next metadata request is sent to
> another
> > > > node.
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > > > On Sun, May 21, 2017 at 9:51 AM, 东方甲乙 <254479...@qq.com> wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > >
> > > > > Thanks for the clarify. For the clarify 2, I think the key thing is
> > not
> > > > > users control how much time in maximum to wait for inside code, but
> > is
> > > > the
> > > > > network client can be aware of the connecting can't be finished and
> > > try a
> > > > > good node. In the producer.sender even the selector.poll can
> timeout,
> > > but
> > > > > the next time is also not close the previous connecting and try
> > another
> > > > > good node.
> > > > >
> > > > >
> > > > > In out test env, QA shutdown one of the leader node, the producer
> > send
> > > > the
> > > > > request will timeout and close the node's connection then request
> the
> > > > > metadata.  But sometimes the request node is also the shutdown
> node.
> > > > When
> > > > > connecting the shutting down node to get the metadata, it is in the
> > > > > connecting phase, network client mark the connecting node's state
> to
> > > > > CONNECTING, but if the node is shutdown,  the socket can't be aware
> > of
> > > > the
> > > > > connecting is broken. Though the selector.poll has timeout
> parameter,
> > > but
> > > > > it will not close the connection, so the next
> > > > > time in the "networkclient.maybeUpdate" it will check if
> > > > > isAnyNodeConnecting, then will not connect to any good node the get
> > the
> > > > > metadata.  It need about several minutes to
> > > > > aware the connecting is timeout and try other node.
> > > > >
> > > > >
> > > > > So I want to add a connect.timeout parameter,  the selector can
> find
> > > the
> > > > > connecting is timeout and close the connection.  It seems the
> > currently
> > > > the
> > > > > timeout value passed in `selector.poll()`
> > > > > seems can not do this.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > David
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > -- 原始邮件 --
> > > > > 发件人: "Guozhang Wang";;
> > > > > 发送时间: 2017年5月16日(星期二) 凌晨1:51
> > > > > 收件人: "dev@kafka.apache.org";
> > > > >
> > > > > 主题: Re: [DISCUSS] KIP-148: Add a connect timeout for 

Re: Re: [DISCUSS] KIP-148: Add a connect timeout for client

2017-05-22 Thread Rajini Sivaram
Ismael,

Yes, agree. My concern was that a connection can be shutdown uncleanly at
any time. If a client is in the middle of a request, then it times out
after min(request.timeout.ms, tcp-timeout). If we add another config option
connect.timeout.ms, then we will sometimes wait for min(connect.timeout.ms,
tcp-timeout) and sometimes for min(request.timeout.ms, tcp-timeout), depending
on connection state. One config option feels neater to me.

On Mon, May 22, 2017 at 11:21 AM, Ismael Juma  wrote:

> Rajini,
>
> For this to have the desired effect, we'd probably need to lower the
> default request.timeout.ms for the consumer and fix the underlying reason
> why it is a little over 5 minutes at the moment.
>
> Ismael
>
> On Mon, May 22, 2017 at 4:15 PM, Rajini Sivaram 
> wrote:
>
> > Hi David,
> >
> > Sorry, what I meant was: Can you reuse the existing configuration option
> > request.timeout,ms , instead of adding a new config and add the behaviour
> > that you have proposed in the KIP for the connection phase using this
> > timeout? I think the timeout for connection is useful. I am not sure we
> > need another configuration option to implement it.
> >
> > Regards,
> >
> > Rajini
> >
> >
> > On Mon, May 22, 2017 at 11:06 AM, 东方甲乙 <254479...@qq.com> wrote:
> >
> > > Hi Rajini.
> > >
> > > When kafka node' machine is shutdown or network is closed, the
> connecting
> > > phase could not use the request.timeout.ms, because the client haven't
> > > send a req yet.   And no response for the nio, the selector will not
> > close
> > > the connect, so it will not choose other good node to get the metadata.
> > >
> > >
> > > Thanks
> > > David
> > >
> > > -- 原始邮件 --
> > > *发件人:* "Rajini Sivaram" ;
> > > *发送时间:* 2017年5月22日(星期一) 20:17
> > > *收件人:* "dev" ;
> > > *主题:* Re: [DISCUSS] KIP-148: Add a connect timeout for client
> > >
> > >
> > > Hi David,
> > >
> > > Is there a reason why you wouldn't want to use request.timeout.ms as
> the
> > > timeout parameter for connections? Then you would use the same timeout
> > for
> > > connected and connecting phases when shutdown is unclean. You could
> still
> > > use the timeout to ensure that next metadata request is sent to another
> > > node.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > > On Sun, May 21, 2017 at 9:51 AM, 东方甲乙 <254479...@qq.com> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > >
> > > > Thanks for the clarify. For the clarify 2, I think the key thing is
> not
> > > > users control how much time in maximum to wait for inside code, but
> is
> > > the
> > > > network client can be aware of the connecting can't be finished and
> > try a
> > > > good node. In the producer.sender even the selector.poll can timeout,
> > but
> > > > the next time is also not close the previous connecting and try
> another
> > > > good node.
> > > >
> > > >
> > > > In out test env, QA shutdown one of the leader node, the producer
> send
> > > the
> > > > request will timeout and close the node's connection then request the
> > > > metadata.  But sometimes the request node is also the shutdown node.
> > > When
> > > > connecting the shutting down node to get the metadata, it is in the
> > > > connecting phase, network client mark the connecting node's state to
> > > > CONNECTING, but if the node is shutdown,  the socket can't be aware
> of
> > > the
> > > > connecting is broken. Though the selector.poll has timeout parameter,
> > but
> > > > it will not close the connection, so the next
> > > > time in the "networkclient.maybeUpdate" it will check if
> > > > isAnyNodeConnecting, then will not connect to any good node the get
> the
> > > > metadata.  It need about several minutes to
> > > > aware the connecting is timeout and try other node.
> > > >
> > > >
> > > > So I want to add a connect.timeout parameter,  the selector can find
> > the
> > > > connecting is timeout and close the connection.  It seems the
> currently
> > > the
> > > > timeout value passed in `selector.poll()`
> > > > seems can not do this.
> > > >
> > > >
> > > > Thanks,
> > > > David
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > -- 原始邮件 --
> > > > 发件人: "Guozhang Wang";;
> > > > 发送时间: 2017年5月16日(星期二) 凌晨1:51
> > > > 收件人: "dev@kafka.apache.org";
> > > >
> > > > 主题: Re: [DISCUSS] KIP-148: Add a connect timeout for client
> > > >
> > > >
> > > >
> > > > Hi David,
> > > >
> > > > I may be a bit confused before, just clarifying a few things:
> > > >
> > > > 1. As you mentioned, a client will always try to first establish the
> > > > connection with a broker node before it tries to send any request to
> > it.
> > > > And after connection is established, it will either continuously send
> > > many
> > > > requests (e.g. produce) for just a single request (e.g. metadata) to
> > the
> > > > broker, so these two phases 

[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-05-22 Thread Brandon Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019697#comment-16019697
 ] 

Brandon Bradley commented on KAFKA-1955:


The current `BufferPool` implementation does not actually reuse any buffers 
(directly). It bounds the buffer space for the pool and tracks how much space 
has been allocated from the heap for the pool. It may even be possible _not_ to 
use a free list in this implementation, but that is not the issue here.

I believe a malloc/free implementation over `MappedByteBuffer` will be the best 
choice. This will allow the producer buffers to use a file like a piece of 
memory at the cost of maintaining a more complex free list.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> the buffer isn't really fault-tolerant, since the ordering in the file isn't 
> known so there is no easy way to recovery the producer's buffer in a failure. 
> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact 
> on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 
> 30 seconds we should be able to keep taking writes) and observe how well the 
> producer handles the catch up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like 
> use.file.buffers=true/false.
> e. Add documentation that covers these new 

Re: Re: [DISCUSS] KIP-148: Add a connect timeout for client

2017-05-22 Thread Ismael Juma
Rajini,

For this to have the desired effect, we'd probably need to lower the
default request.timeout.ms for the consumer and fix the underlying reason
why it is a little over 5 minutes at the moment.

Ismael

On Mon, May 22, 2017 at 4:15 PM, Rajini Sivaram 
wrote:

> Hi David,
>
> Sorry, what I meant was: Can you reuse the existing configuration option
> request.timeout,ms , instead of adding a new config and add the behaviour
> that you have proposed in the KIP for the connection phase using this
> timeout? I think the timeout for connection is useful. I am not sure we
> need another configuration option to implement it.
>
> Regards,
>
> Rajini
>
>
> On Mon, May 22, 2017 at 11:06 AM, 东方甲乙 <254479...@qq.com> wrote:
>
> > Hi Rajini.
> >
> > When kafka node' machine is shutdown or network is closed, the connecting
> > phase could not use the request.timeout.ms, because the client haven't
> > send a req yet.   And no response for the nio, the selector will not
> close
> > the connect, so it will not choose other good node to get the metadata.
> >
> >
> > Thanks
> > David
> >
> > -- 原始邮件 --
> > *发件人:* "Rajini Sivaram" ;
> > *发送时间:* 2017年5月22日(星期一) 20:17
> > *收件人:* "dev" ;
> > *主题:* Re: [DISCUSS] KIP-148: Add a connect timeout for client
> >
> >
> > Hi David,
> >
> > Is there a reason why you wouldn't want to use request.timeout.ms as the
> > timeout parameter for connections? Then you would use the same timeout
> for
> > connected and connecting phases when shutdown is unclean. You could still
> > use the timeout to ensure that next metadata request is sent to another
> > node.
> >
> > Regards,
> >
> > Rajini
> >
> > On Sun, May 21, 2017 at 9:51 AM, 东方甲乙 <254479...@qq.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > >
> > > Thanks for the clarify. For the clarify 2, I think the key thing is not
> > > users control how much time in maximum to wait for inside code, but is
> > the
> > > network client can be aware of the connecting can't be finished and
> try a
> > > good node. In the producer.sender even the selector.poll can timeout,
> but
> > > the next time is also not close the previous connecting and try another
> > > good node.
> > >
> > >
> > > In out test env, QA shutdown one of the leader node, the producer send
> > the
> > > request will timeout and close the node's connection then request the
> > > metadata.  But sometimes the request node is also the shutdown node.
> > When
> > > connecting the shutting down node to get the metadata, it is in the
> > > connecting phase, network client mark the connecting node's state to
> > > CONNECTING, but if the node is shutdown,  the socket can't be aware of
> > the
> > > connecting is broken. Though the selector.poll has timeout parameter,
> but
> > > it will not close the connection, so the next
> > > time in the "networkclient.maybeUpdate" it will check if
> > > isAnyNodeConnecting, then will not connect to any good node the get the
> > > metadata.  It need about several minutes to
> > > aware the connecting is timeout and try other node.
> > >
> > >
> > > So I want to add a connect.timeout parameter,  the selector can find
> the
> > > connecting is timeout and close the connection.  It seems the currently
> > the
> > > timeout value passed in `selector.poll()`
> > > seems can not do this.
> > >
> > >
> > > Thanks,
> > > David
> > >
> > >
> > >
> > >
> > >
> > >
> > > -- 原始邮件 --
> > > 发件人: "Guozhang Wang";;
> > > 发送时间: 2017年5月16日(星期二) 凌晨1:51
> > > 收件人: "dev@kafka.apache.org";
> > >
> > > 主题: Re: [DISCUSS] KIP-148: Add a connect timeout for client
> > >
> > >
> > >
> > > Hi David,
> > >
> > > I may be a bit confused before, just clarifying a few things:
> > >
> > > 1. As you mentioned, a client will always try to first establish the
> > > connection with a broker node before it tries to send any request to
> it.
> > > And after connection is established, it will either continuously send
> > many
> > > requests (e.g. produce) for just a single request (e.g. metadata) to
> the
> > > broker, so these two phases are indeed different.
> > >
> > > 2. In the connected phase, connections.max.idle.ms is used to
> > > auto-disconnect the socket if no requests has been sent / received
> during
> > > that period of time; in the connecting phase, we always try to create
> the
> > > socket via "socketChannel.connect" in a non-blocking call, and then
> > checks
> > > if the connection has been established, but all the callers of this
> > > function (in either producer or consumer) has a timeout parameter as in
> > > `selector.poll()`, and the timeout parameter is set either by
> > calculations
> > > based on metadata.expiration.time and backoff for producer#sender, or
> by
> > > directly passed values from consumer#poll(timeout), so although there
> is
> > no
> > > directly config controlling that, 

Re: Re: [DISCUSS] KIP-148: Add a connect timeout for client

2017-05-22 Thread Rajini Sivaram
Hi David,

Sorry, what I meant was: Can you reuse the existing configuration option
request.timeout,ms , instead of adding a new config and add the behaviour
that you have proposed in the KIP for the connection phase using this
timeout? I think the timeout for connection is useful. I am not sure we
need another configuration option to implement it.

Regards,

Rajini


On Mon, May 22, 2017 at 11:06 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Rajini.
>
> When kafka node' machine is shutdown or network is closed, the connecting
> phase could not use the request.timeout.ms, because the client haven't
> send a req yet.   And no response for the nio, the selector will not close
> the connect, so it will not choose other good node to get the metadata.
>
>
> Thanks
> David
>
> -- 原始邮件 --
> *发件人:* "Rajini Sivaram" ;
> *发送时间:* 2017年5月22日(星期一) 20:17
> *收件人:* "dev" ;
> *主题:* Re: [DISCUSS] KIP-148: Add a connect timeout for client
>
>
> Hi David,
>
> Is there a reason why you wouldn't want to use request.timeout.ms as the
> timeout parameter for connections? Then you would use the same timeout for
> connected and connecting phases when shutdown is unclean. You could still
> use the timeout to ensure that next metadata request is sent to another
> node.
>
> Regards,
>
> Rajini
>
> On Sun, May 21, 2017 at 9:51 AM, 东方甲乙 <254479...@qq.com> wrote:
>
> > Hi Guozhang,
> >
> >
> > Thanks for the clarify. For the clarify 2, I think the key thing is not
> > users control how much time in maximum to wait for inside code, but is
> the
> > network client can be aware of the connecting can't be finished and try a
> > good node. In the producer.sender even the selector.poll can timeout, but
> > the next time is also not close the previous connecting and try another
> > good node.
> >
> >
> > In out test env, QA shutdown one of the leader node, the producer send
> the
> > request will timeout and close the node's connection then request the
> > metadata.  But sometimes the request node is also the shutdown node.
> When
> > connecting the shutting down node to get the metadata, it is in the
> > connecting phase, network client mark the connecting node's state to
> > CONNECTING, but if the node is shutdown,  the socket can't be aware of
> the
> > connecting is broken. Though the selector.poll has timeout parameter, but
> > it will not close the connection, so the next
> > time in the "networkclient.maybeUpdate" it will check if
> > isAnyNodeConnecting, then will not connect to any good node the get the
> > metadata.  It need about several minutes to
> > aware the connecting is timeout and try other node.
> >
> >
> > So I want to add a connect.timeout parameter,  the selector can find the
> > connecting is timeout and close the connection.  It seems the currently
> the
> > timeout value passed in `selector.poll()`
> > seems can not do this.
> >
> >
> > Thanks,
> > David
> >
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: "Guozhang Wang";;
> > 发送时间: 2017年5月16日(星期二) 凌晨1:51
> > 收件人: "dev@kafka.apache.org";
> >
> > 主题: Re: [DISCUSS] KIP-148: Add a connect timeout for client
> >
> >
> >
> > Hi David,
> >
> > I may be a bit confused before, just clarifying a few things:
> >
> > 1. As you mentioned, a client will always try to first establish the
> > connection with a broker node before it tries to send any request to it.
> > And after connection is established, it will either continuously send
> many
> > requests (e.g. produce) for just a single request (e.g. metadata) to the
> > broker, so these two phases are indeed different.
> >
> > 2. In the connected phase, connections.max.idle.ms is used to
> > auto-disconnect the socket if no requests has been sent / received during
> > that period of time; in the connecting phase, we always try to create the
> > socket via "socketChannel.connect" in a non-blocking call, and then
> checks
> > if the connection has been established, but all the callers of this
> > function (in either producer or consumer) has a timeout parameter as in
> > `selector.poll()`, and the timeout parameter is set either by
> calculations
> > based on metadata.expiration.time and backoff for producer#sender, or by
> > directly passed values from consumer#poll(timeout), so although there is
> no
> > directly config controlling that, users can still control how much time
> in
> > maximum to wait for inside code.
> >
> > I originally thought your scenarios is more on the connected phase, but
> now
> > I feel you are talking about the connecting phase. For that case, I still
> > feel currently the timeout value passed in `selector.poll()` which is
> > controllable from user code should be sufficient?
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Sun, May 14, 2017 at 2:37 AM, 东方甲乙 <254479...@qq.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > >
> > > Sorry for the delay, thanks for the 

[jira] [Assigned] (KAFKA-5305) Missing logging information in ReplicaFetcher

2017-05-22 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-5305:
-

Assignee: Ismael Juma

> Missing logging information in ReplicaFetcher
> -
>
> Key: KAFKA-5305
> URL: https://issues.apache.org/jira/browse/KAFKA-5305
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Andrey
>Assignee: Ismael Juma
> Fix For: 0.11.0.0, 0.11.1.0
>
>
> Currently we see a lot of messages like this:
> {code}[2017-05-22 12:13:18,707] WARN [ReplicaFetcherThread-0-1], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@300065c5 
> (kafka.server.ReplicaFetcherThread)
> java.net.SocketTimeoutException: Failed to connect within 3 ms
>   at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249)
>   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {code}
> However it's not clear what host is not available. 
> Expected:
> * Failed to connect within 3 ms to host:port
> Note:
> * KAFKA-5303 might help with details



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-5305) Missing logging information in ReplicaFetcher

2017-05-22 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-5305.
---
   Resolution: Fixed
Fix Version/s: 0.11.1.0
   0.11.0.0

Issue resolved by pull request 3115
[https://github.com/apache/kafka/pull/3115]

> Missing logging information in ReplicaFetcher
> -
>
> Key: KAFKA-5305
> URL: https://issues.apache.org/jira/browse/KAFKA-5305
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Andrey
>Assignee: Ismael Juma
> Fix For: 0.11.0.0, 0.11.1.0
>
>
> Currently we see a lot of messages like this:
> {code}[2017-05-22 12:13:18,707] WARN [ReplicaFetcherThread-0-1], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@300065c5 
> (kafka.server.ReplicaFetcherThread)
> java.net.SocketTimeoutException: Failed to connect within 3 ms
>   at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249)
>   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {code}
> However it's not clear what host is not available. 
> Expected:
> * Failed to connect within 3 ms to host:port
> Note:
> * KAFKA-5303 might help with details



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5303) FetchRequest doesn't implement toString

2017-05-22 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-5303:
-

Assignee: Ismael Juma

> FetchRequest doesn't implement toString
> ---
>
> Key: KAFKA-5303
> URL: https://issues.apache.org/jira/browse/KAFKA-5303
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Andrey
>Assignee: Ismael Juma
> Fix For: 0.11.0.0, 0.11.1.0
>
>
> Which cause meaningless log entries such as:
> {code}
> WARN [ReplicaFetcherThread-0-1], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@6189fab0 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5303) FetchRequest doesn't implement toString

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019681#comment-16019681
 ] 

ASF GitHub Bot commented on KAFKA-5303:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3115


> FetchRequest doesn't implement toString
> ---
>
> Key: KAFKA-5303
> URL: https://issues.apache.org/jira/browse/KAFKA-5303
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Andrey
> Fix For: 0.11.0.0, 0.11.1.0
>
>
> Which cause meaningless log entries such as:
> {code}
> WARN [ReplicaFetcherThread-0-1], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@6189fab0 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-5303) FetchRequest doesn't implement toString

2017-05-22 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-5303.
---
   Resolution: Fixed
Fix Version/s: 0.11.1.0
   0.11.0.0

Issue resolved by pull request 3115
[https://github.com/apache/kafka/pull/3115]

> FetchRequest doesn't implement toString
> ---
>
> Key: KAFKA-5303
> URL: https://issues.apache.org/jira/browse/KAFKA-5303
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Andrey
> Fix For: 0.11.0.0, 0.11.1.0
>
>
> Which cause meaningless log entries such as:
> {code}
> WARN [ReplicaFetcherThread-0-1], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@6189fab0 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3115: KAFKA-5303, KAFKA-5305: Improve logging when fetch...

2017-05-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3115


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re:Re: [DISCUSS] KIP-148: Add a connect timeout for client

2017-05-22 Thread 东方甲乙
Hi Rajini. 

When kafka node' machine is shutdown or network is closed, the connecting phase 
could not use the request.timeout.ms, because the client haven't send a req 
yet.   And no response for the nio, the selector will not close the connect, so 
it will not choose other good node to get the metadata.

 
Thanks
David

-- 原始邮件 --
发件人: "Rajini Sivaram" ;
发送时间: 2017年5月22日(星期一) 20:17
收件人: "dev" ;
主题: Re: [DISCUSS] KIP-148: Add a connect timeout for client



Hi David,

Is there a reason why you wouldn't want to use request.timeout.ms as the
timeout parameter for connections? Then you would use the same timeout for
connected and connecting phases when shutdown is unclean. You could still
use the timeout to ensure that next metadata request is sent to another
node.

Regards,

Rajini

On Sun, May 21, 2017 at 9:51 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Guozhang,
>
>
> Thanks for the clarify. For the clarify 2, I think the key thing is not
> users control how much time in maximum to wait for inside code, but is the
> network client can be aware of the connecting can't be finished and try a
> good node. In the producer.sender even the selector.poll can timeout, but
> the next time is also not close the previous connecting and try another
> good node.
>
>
> In out test env, QA shutdown one of the leader node, the producer send the
> request will timeout and close the node's connection then request the
> metadata.  But sometimes the request node is also the shutdown node.  When
> connecting the shutting down node to get the metadata, it is in the
> connecting phase, network client mark the connecting node's state to
> CONNECTING, but if the node is shutdown,  the socket can't be aware of the
> connecting is broken. Though the selector.poll has timeout parameter, but
> it will not close the connection, so the next
> time in the "networkclient.maybeUpdate" it will check if
> isAnyNodeConnecting, then will not connect to any good node the get the
> metadata.  It need about several minutes to
> aware the connecting is timeout and try other node.
>
>
> So I want to add a connect.timeout parameter,  the selector can find the
> connecting is timeout and close the connection.  It seems the currently the
> timeout value passed in `selector.poll()`
> seems can not do this.
>
>
> Thanks,
> David
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Guozhang Wang";;
> 发送时间: 2017年5月16日(星期二) 凌晨1:51
> 收件人: "dev@kafka.apache.org";
>
> 主题: Re: [DISCUSS] KIP-148: Add a connect timeout for client
>
>
>
> Hi David,
>
> I may be a bit confused before, just clarifying a few things:
>
> 1. As you mentioned, a client will always try to first establish the
> connection with a broker node before it tries to send any request to it.
> And after connection is established, it will either continuously send many
> requests (e.g. produce) for just a single request (e.g. metadata) to the
> broker, so these two phases are indeed different.
>
> 2. In the connected phase, connections.max.idle.ms is used to
> auto-disconnect the socket if no requests has been sent / received during
> that period of time; in the connecting phase, we always try to create the
> socket via "socketChannel.connect" in a non-blocking call, and then checks
> if the connection has been established, but all the callers of this
> function (in either producer or consumer) has a timeout parameter as in
> `selector.poll()`, and the timeout parameter is set either by calculations
> based on metadata.expiration.time and backoff for producer#sender, or by
> directly passed values from consumer#poll(timeout), so although there is no
> directly config controlling that, users can still control how much time in
> maximum to wait for inside code.
>
> I originally thought your scenarios is more on the connected phase, but now
> I feel you are talking about the connecting phase. For that case, I still
> feel currently the timeout value passed in `selector.poll()` which is
> controllable from user code should be sufficient?
>
>
> Guozhang
>
>
>
>
> On Sun, May 14, 2017 at 2:37 AM, 东方甲乙 <254479...@qq.com> wrote:
>
> > Hi Guozhang,
> >
> >
> > Sorry for the delay, thanks for the question.  It seems two different
> > parameters to me:
> > connect.timeout.ms: only work for the connecting phrase, after connected
> > phrase this parameter is not used.
> > connections.max.idle.ms: currently not work in the connecting phrase
> > (only select return readyKeys >0) will add to the expired manager, after
> > connected will check if the connection is still alive in some time.
> >
> >
> > Even if we change the connections.max.idle.ms to work including the
> > connecting phrase, we can not set this parameter to a small value, such
> as
> > 5 seconds. Because the client is maybe busy sending message to other
> node,
> > it will be disconnected in 5 seconds, so the default 

[jira] [Commented] (KAFKA-5298) MirrorMaker deadlocks with missing topics

2017-05-22 Thread Raymond Conn (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019596#comment-16019596
 ] 

Raymond Conn commented on KAFKA-5298:
-

Thanks, this definitely fixes part of it since the producer can now recover if 
the topic gets created at the destination and the logs will keep warning until 
the topic gets created. 

I don't think it fixes the deadlock at shutdown however, since if the topic 
never gets created the mirrorMaker thread will never exit the producer (since 
{{max.block.ms}} is max long) and as a result can never countdown the shutdown 
latch 

> MirrorMaker deadlocks with missing topics
> -
>
> Key: KAFKA-5298
> URL: https://issues.apache.org/jira/browse/KAFKA-5298
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, tools
>Affects Versions: 0.10.2.1
>Reporter: Raymond Conn
>
> When mirrorMaker mirrors a topic to destination brokers that have topic auto 
> create disabled and a topic doesn't exist on the destination brokers, the 
> producer in mirror maker logs the following 
> {code}
> Error while fetching metadata with correlation id 467 : 
> \{mirror-test2=UNKNOWN_TOPIC_OR_PARTITION\}
> Error while fetching metadata with correlation id 468 : 
> {mirror-test2=UNKNOWN_TOPIC_OR_PARTITION}
> {code}
> This log message is fine and expected. The problem is the log message stops 
> ~5 min later. At which point the logs look fine, but mirror maker is not 
> mirroring any of its topics. 
> What's worse is mirrorMaker is basically in an unrecoverable state once this 
> happens (the log statement stops). If you create the topic at the destination 
> mirrorMaker still won't mirror data until a restart. Attempts to restart 
> mirrorMaker (cleanly) fail because the process is more or less deadlocked in 
> its shutdown hook.
> Here is the reasoning:
> * MirrorMaker becomes unrecoverable after 5 minutes because of this loop in 
> the 
> [producer|https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L543-L561]
> * The producer will keep waiting for metadata for the missing topic or until 
> the max timeout is reached. (max long in this case) 
> * after 5 minutes the producer stops making a metadata request for the topic 
> because that topic expires 
> [here|https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L218]
>  
> * topic is never re-added for metadata requests since the only add is before 
> entering the loop 
> [here|(https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L528]
> So basically after 5 minutes all metadata requests moving forward are for no 
> topics since the topic expired. The mirrorMaker thread essentially gets stuck 
> waiting forever since there will never be a metadata request for the topic 
> the thread is waiting on
> All of this basically leads to a deadlock state in the shutdown hook. 
> * shutdown hook sends a shutdown to the mirrorMaker threads 
> * waits for threads to exit their loop by waitind on a 
> [latch|https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/tools/MirrorMaker.scala#L396]
> * latch is never counted down in 
> [produce|https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/tools/MirrorMaker.scala#L434]
>  
> * thread will never exit the loop to countdown the latch on line 462.
> This can be seen with a thread dump of the shutdown hook thread
> {code}
> Name: MirrorMakerShutdownHook
> State: WAITING on java.util.concurrent.CountDownLatch$Sync@3ffebeac
> Total blocked: 0  Total waited: 1
> Stack trace: 
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> kafka.tools.MirrorMaker$MirrorMakerThread.awaitShutdown(MirrorMaker.scala:498)
> kafka.tools.MirrorMaker$$anonfun$cleanShutdown$4.apply(MirrorMaker.scala:396)
> kafka.tools.MirrorMaker$$anonfun$cleanShutdown$4.apply(MirrorMaker.scala:396)
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> {code}
> The root of the issue more or less related to the issue documented here where 
> the producer can block waiting for metadata. 
> https://issues.apache.org/jira/browse/KAFKA-3450



--
This 

[jira] [Comment Edited] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2017-05-22 Thread dhiraj prajapati (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019536#comment-16019536
 ] 

dhiraj prajapati edited comment on KAFKA-4477 at 5/22/17 1:45 PM:
--

Hi all,
We have a 3-node cluster on our production environment. We recently upgraded 
kafka from 0.9.0.1 to 0.10.1.0 and we are seeing a similar issue of 
intermittent disconnection. We never had this issue in 0.9.0.1. Below is the 
exception from broker's server.log:

Below is the exception stack trace:
[2017-05-15 09:33:55,398] WARN [ReplicaFetcherThread-0-2], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@7213d6d (kafka.server.
ReplicaFetcherThread)
java.io.IOException: Connection to 2 was disconnected before the response was 
read
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.sca
la:115)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.sca
la:112)
at scala.Option.foreach(Option.scala:257)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scal
a:143)
at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


Is this issue fixed in later versions? I am asking this because I saw a similar 
thread for version 10.2:
https://issues.apache.org/jira/browse/KAFKA-5153

Please assist.


was (Author: dhirajpraj):
Hi all,
We have a 3-node cluster on our production environment. We recently upgraded 
kafka from 0.9.0.1 to 0.10.1.0 and we are seeing a similar issue of 
intermittent disconnection. We never had this issue in 0.9.0.1. 

Is this issue fixed in later versions? I am asking this because I saw a similar 
thread for version 10.2:
https://issues.apache.org/jira/browse/KAFKA-5153

Please assist.

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001_ext.log, 
> issue_node_1001.log, issue_node_1002_ext.log, issue_node_1002.log, 
> issue_node_1003_ext.log, issue_node_1003.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: 

Build failed in Jenkins: kafka-0.11.0-jdk7 #11

2017-05-22 Thread Apache Jenkins Server
See 


Changes:

[rajinisivaram] KAFKA-5289: handleStopReplica should not send a second response

--
[...truncated 884.14 KB...]
kafka.admin.TopicCommandTest > testCreateIfNotExists PASSED

kafka.admin.TopicCommandTest > testCreateAlterTopicWithRackAware STARTED

kafka.admin.TopicCommandTest > testCreateAlterTopicWithRackAware PASSED

kafka.admin.TopicCommandTest > testTopicDeletion STARTED

kafka.admin.TopicCommandTest > testTopicDeletion PASSED

kafka.admin.TopicCommandTest > testConfigPreservationAcrossPartitionAlteration 
STARTED

kafka.admin.TopicCommandTest > testConfigPreservationAcrossPartitionAlteration 
PASSED

kafka.admin.TopicCommandTest > testAlterIfExists STARTED

kafka.admin.TopicCommandTest > testAlterIfExists PASSED

kafka.admin.TopicCommandTest > testDeleteIfExists STARTED

kafka.admin.TopicCommandTest > testDeleteIfExists PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection STARTED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection PASSED

kafka.controller.ControllerIntegrationTest > testTopicCreation STARTED

kafka.controller.ControllerIntegrationTest > testTopicCreation PASSED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment STARTED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment PASSED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion STARTED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled PASSED

kafka.controller.ControllerIntegrationTest > testEmptyCluster STARTED

kafka.controller.ControllerIntegrationTest > testEmptyCluster PASSED

kafka.controller.ControllerIntegrationTest > testPreferredReplicaLeaderElection 
STARTED

kafka.controller.ControllerIntegrationTest > testPreferredReplicaLeaderElection 
PASSED

kafka.controller.ControllerFailoverTest > testMetadataUpdate STARTED

kafka.controller.ControllerFailoverTest > testMetadataUpdate SKIPPED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest 

[jira] [Created] (KAFKA-5306) Official init.d scripts

2017-05-22 Thread Shahar (JIRA)
Shahar created KAFKA-5306:
-

 Summary: Official init.d scripts
 Key: KAFKA-5306
 URL: https://issues.apache.org/jira/browse/KAFKA-5306
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.10.2.1
 Environment: Ubuntu 14.04
Reporter: Shahar
Priority: Minor


It would be great to have an officially supported init.d script for starting 
and stopping Kafka as a service.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5303) FetchRequest doesn't implement toString

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019581#comment-16019581
 ] 

ASF GitHub Bot commented on KAFKA-5303:
---

GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/3115

KAFKA-5303, KAFKA-5305: Improve logging when fetches fail in 
ReplicaFetcherThread



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-5305-missing-log-info-replica-fetcher

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3115.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3115


commit e6b61de4c895f01c19ddac836367c483960e1ed7
Author: Ismael Juma 
Date:   2017-05-22T13:30:52Z

KAFKA-5305: Improve logging when fetches fail in ReplicaFetcherThread




> FetchRequest doesn't implement toString
> ---
>
> Key: KAFKA-5303
> URL: https://issues.apache.org/jira/browse/KAFKA-5303
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Andrey
>
> Which cause meaningless log entries such as:
> {code}
> WARN [ReplicaFetcherThread-0-1], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@6189fab0 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3115: KAFKA-5303, KAFKA-5305: Improve logging when fetch...

2017-05-22 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/3115

KAFKA-5303, KAFKA-5305: Improve logging when fetches fail in 
ReplicaFetcherThread



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-5305-missing-log-info-replica-fetcher

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3115.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3115


commit e6b61de4c895f01c19ddac836367c483960e1ed7
Author: Ismael Juma 
Date:   2017-05-22T13:30:52Z

KAFKA-5305: Improve logging when fetches fail in ReplicaFetcherThread




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (KAFKA-5263) kakfa-clients consume 100% CPU with manual partition assignment when network connection is lost

2017-05-22 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-5263:
-

Assignee: Rajini Sivaram

> kakfa-clients consume 100% CPU with manual partition assignment when network 
> connection is lost
> ---
>
> Key: KAFKA-5263
> URL: https://issues.apache.org/jira/browse/KAFKA-5263
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0, 0.10.2.1
>Reporter: Konstantin Smirnov
>Assignee: Rajini Sivaram
> Attachments: cpu_consuming.log, cpu_consuming_profile.csv
>
>
> Noticed that lose of the connection to Kafka broker leads kafka-clients to 
> consume 100% CPU. The bug only appears when the manual partition assignmet is 
> used. It appears since the version 0.10.1.0. The bug is quite similar to 
> KAFKA-1642.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5153) KAFKA Cluster : 0.10.2.0 : Servers Getting disconnected : Service Impacting

2017-05-22 Thread dhiraj prajapati (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019542#comment-16019542
 ] 

dhiraj prajapati commented on KAFKA-5153:
-

Hi all,
We have a 3-node cluster on our production environment. We recently upgraded 
kafka from 0.9.0.1 to 0.10.1.0 and we are seeing a similar issue of 
intermittent disconnection. We never had this issue in 0.9.0.1.

Below is the exception stack trace:

[2017-05-15 09:33:55,398] WARN [ReplicaFetcherThread-0-2], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@7213d6d (kafka.server.
ReplicaFetcherThread)
java.io.IOException: Connection to 2 was disconnected before the response was 
read
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.sca
la:115)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.sca
la:112)
at scala.Option.foreach(Option.scala:257)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scal
a:143)
at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

Is there a fix for this issue in any of the kafka 10 versions?

> KAFKA Cluster : 0.10.2.0 : Servers Getting disconnected : Service Impacting
> ---
>
> Key: KAFKA-5153
> URL: https://issues.apache.org/jira/browse/KAFKA-5153
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
> Environment: RHEL 6
> Java Version  1.8.0_91-b14
>Reporter: Arpan
>Priority: Critical
> Attachments: server_1_72server.log, server_2_73_server.log, 
> server_3_74Server.log, server.properties, ThreadDump_1493564142.dump, 
> ThreadDump_1493564177.dump, ThreadDump_1493564249.dump
>
>
> Hi Team, 
> I was earlier referring to issue KAFKA-4477 because the problem i am facing 
> is similar. I tried to search the same reference in release docs as well but 
> did not get anything in 0.10.1.1 or 0.10.2.0. I am currently using 
> 2.11_0.10.2.0.
> I am have 3 node cluster for KAFKA and cluster for ZK as well on the same set 
> of servers in cluster mode. We are having around 240GB of data getting 
> transferred through KAFKA everyday. What we are observing is disconnect of 
> the server from cluster and ISR getting reduced and it starts impacting 
> service.
> I have also observed file descriptor count getting increased a bit, in normal 
> circumstances we have not observed FD count more than 500 but when issue 
> started we were observing it in the range of 650-700 on all 3 servers. 
> Attaching thread dumps of all 3 servers when we started facing the issue 
> recently.
> The issue get vanished once you bounce the nodes and the set up is not 
> working more than 5 days without this issue. Attaching server logs as well.
> Kindly let me know if you need any additional information. Attaching 
> server.properties as well for one of the server (It's similar on all 3 
> serversP)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2017-05-22 Thread dhiraj prajapati (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019536#comment-16019536
 ] 

dhiraj prajapati edited comment on KAFKA-4477 at 5/22/17 12:49 PM:
---

Hi all,
We have a 3-node cluster on our production environment. We recently upgraded 
kafka from 0.9.0.1 to 0.10.1.0 and we are seeing a similar issue of 
intermittent disconnection. We never had this issue in 0.9.0.1. 

Is this issue fixed in later versions? I am asking this because I saw a similar 
thread for version 10.2:
https://issues.apache.org/jira/browse/KAFKA-5153

Please assist.


was (Author: dhirajpraj):
Hi all,
We have a 3-node cluster on our production environment. We recenctly upgraded 
kafka from 0.9.0.1 to 0.10.1.0 and we are seeing a similar issue of 
intermittent disconnection. We never had this issue in 0.9.0.1. 

Is this issue fixed in later versions? I am asking this because I saw a similar 
thread for version 10.2:
https://issues.apache.org/jira/browse/KAFKA-5153

Please assist.

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001_ext.log, 
> issue_node_1001.log, issue_node_1002_ext.log, issue_node_1002.log, 
> issue_node_1003_ext.log, issue_node_1003.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2017-05-22 Thread dhiraj prajapati (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019536#comment-16019536
 ] 

dhiraj prajapati commented on KAFKA-4477:
-

Hi all,
We have a 3-node cluster on our production environment. We recenctly upgraded 
kafka from 0.9.0.1 to 0.10.1.0 and we are seeing a similar issue of 
intermittent disconnection. We never had this issue in 0.9.0.1. 

Is this issue fixed in later versions? I am asking this because I saw a similar 
thread for version 10.2:
https://issues.apache.org/jira/browse/KAFKA-5153

Please assist.

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001_ext.log, 
> issue_node_1001.log, issue_node_1002_ext.log, issue_node_1002.log, 
> issue_node_1003_ext.log, issue_node_1003.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5289) One StopReplicaRequest will caused two Responses

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019532#comment-16019532
 ] 

ASF GitHub Bot commented on KAFKA-5289:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3096


> One StopReplicaRequest will caused two Responses
> 
>
> Key: KAFKA-5289
> URL: https://issues.apache.org/jira/browse/KAFKA-5289
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: xuzq
>Assignee: Ismael Juma
>Priority: Critical
> Fix For: 0.11.0.0, 0.11.1.0
>
> Attachments: handleStopReplicaRequest.png, KAFKA-5289.patch
>
>
> After discussed with my friend markTC,we find a bug.
> One StopReplicaRequest will caused two Responses.
> At core/src/main/scala/kafka/server/KafkaApi.class 175 and 176 lines.
> When an exception caused at 
> 'replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()', 
> will also return two responses.
> one is at 175 lines 'requestChannel.sendResponse(new 
> RequestChannel.Response(request, new ResponseSend(request.connectionId, 
> responseHeader, response)))' and another at 111 lines 
> 'requestChannel.sendResponse(new Response(request, new 
> ResponseSend(request.connectionId, respHeader, response)))'.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3096: KAFKA-5289: handleStopReplica should not send a se...

2017-05-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3096


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >