[jira] [Commented] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111891#comment-16111891
 ] 

Jiangjie Qin commented on KAFKA-5678:
-

[~cuiyang] There might be a few things to consider here, the leader broker will 
return the response if the follower drops out of the ISR. By default, that 
could take {{replica.lag.time.max.ms}}. So typically request timeout should not 
be less than that. Also, if you configure the retries on the producer side to 
be non-zero, the producer will retry on request timeout so the users should not 
see this exception.

Given that, the slow response can also happen when a follower broker is down, 
because it takes some time for the leader to kick the follower out of ISR so it 
can make progress. But if it is only leader migration without a broker going 
down, I think the issue is unlikely to happen.

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5560) LogManager should be able to create new logs based on free disk space

2017-08-02 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx updated KAFKA-5560:
--
Labels: needs-kip  (was: kips)

> LogManager should be able to create new logs based on free disk space
> -
>
> Key: KAFKA-5560
> URL: https://issues.apache.org/jira/browse/KAFKA-5560
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>  Labels: needs-kip
>
> Currently, log manager chooses a directory configured in `log.dirs` by 
> calculating the number partitions in each directory and then choosing the one 
> with the fewest partitions. But in some real production scenarios where data 
> volumes of partitions are not even, some disks nearly become full whereas the 
> others have a lot of spaces which lead to a poor data distribution.
> We should offer a new strategy to users to have log manager honor the real 
> disk free spaces and choose the directory with the most disk space. Maybe a 
> new broker configuration parameter is needed, `log.directory.strategy` for 
> instance. A new KIP is created to track this issue: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-178%3A+Size-based+log+directory+selection+strategy
> Does it make sense?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5671) Add StreamsBuilder and deprecate KStreamBuilder

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111821#comment-16111821
 ] 

ASF GitHub Bot commented on KAFKA-5671:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3603


> Add StreamsBuilder and deprecate KStreamBuilder
> ---
>
> Key: KAFKA-5671
> URL: https://issues.apache.org/jira/browse/KAFKA-5671
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5006) KeyValueStore.put may throw exception unrelated to the current put attempt

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111764#comment-16111764
 ] 

ASF GitHub Bot commented on KAFKA-5006:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3534


> KeyValueStore.put may throw exception unrelated to the current put attempt
> --
>
> Key: KAFKA-5006
> URL: https://issues.apache.org/jira/browse/KAFKA-5006
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.1.0, 0.10.2.0
>Reporter: Xavier Léauté
>Assignee: Guozhang Wang
>Priority: Blocker
>  Labels: user-experience
> Fix For: 1.0.0
>
>
> It is possible for {{KeyValueStore.put(K key, V value)}} to throw an 
> exception unrelated to the store in question. Due to [the way that 
> {{RecordCollector.send}} is currently 
> implemented|https://github.com/confluentinc/kafka/blob/3.2.x/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L76]
> the exception thrown would be for any previous record produced by the stream 
> task, possibly for a completely unrelated topic the same task is producing to.
> This can be very confusing for someone attempting to correctly handle 
> exceptions thrown by put(), as they would not be able to add any additional 
> debugging information to understand the operation that caused the problem. 
> Worse, such logging would likely confuse the user, since they might mislead 
> themselves into thinking the changelog record created by calling put() caused 
> the problem.
> Given that there is likely no way for the user to recover from an exception 
> thrown by an unrelated produce request, it is questionable whether we should 
> even try to raise the exception at this level. A short-term fix would be to 
> simply delegate this exception to the uncaught exception handler.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5663) LogDirFailureTest system test fails

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111885#comment-16111885
 ] 

ASF GitHub Bot commented on KAFKA-5663:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3594


> LogDirFailureTest system test fails
> ---
>
> Key: KAFKA-5663
> URL: https://issues.apache.org/jira/browse/KAFKA-5663
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Dong Lin
>
> The recently added JBOD system test failed last night.
> {noformat}
> Producer failed to produce messages for 20s.
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py",
>  line 166, in test_replication_with_disk_failure
> self.start_producer_and_consumer()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 75, in start_producer_and_consumer
> self.producer_start_timeout_sec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Producer failed to produce messages for 20s.
> {noformat}
> Complete logs here:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111779#comment-16111779
 ] 

Guozhang Wang commented on KAFKA-5684:
--

1. regarding using `peek` to implement `print`: yes we have been trying to do 
so in https://issues.apache.org/jira/browse/KAFKA-4772, but we did not yet 
collapse {{KStreamPeek}} and {{KStreamPrint}} into the same class, mainly 
because we were treating serdes unnecessarily specially.

2. as Paolo mentioned, since we know at construction time if the mapper is 
provided or not, we can just wrap the serdes into the default mapper ONCE in 
the {{init}} call with one condition check only; by doing this we can get rid 
of the per-call condition check as {{instanceof}}.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5663) LogDirFailureTest system test fails

2017-08-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-5663.

   Resolution: Fixed
Fix Version/s: 1.0.0

> LogDirFailureTest system test fails
> ---
>
> Key: KAFKA-5663
> URL: https://issues.apache.org/jira/browse/KAFKA-5663
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Dong Lin
> Fix For: 1.0.0
>
>
> The recently added JBOD system test failed last night.
> {noformat}
> Producer failed to produce messages for 20s.
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py",
>  line 166, in test_replication_with_disk_failure
> self.start_producer_and_consumer()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 75, in start_producer_and_consumer
> self.producer_start_timeout_sec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Producer failed to produce messages for 20s.
> {noformat}
> Complete logs here:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-2244) Document Kafka metrics configuration properties

2017-08-02 Thread Pranav Maniar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110426#comment-16110426
 ] 

Pranav Maniar commented on KAFKA-2244:
--

[~granthenke], [~sslavic] can I take it think ?

> Document Kafka metrics configuration properties
> ---
>
> Key: KAFKA-2244
> URL: https://issues.apache.org/jira/browse/KAFKA-2244
> Project: Kafka
>  Issue Type: Task
>  Components: config, website
>Affects Versions: 0.8.2.1
>Reporter: Stevo Slavic
>Assignee: Grant Henke
>  Labels: newbie
>
> Please have two configuration properties used in 
> kafka.metrics.KafkaMetricsConfig, namely "kafka.metrics.reporters" and 
> "kafka.metrics.polling.interval.secs", documented on 
> http://kafka.apache.org/documentation.html#configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread james chien (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110481#comment-16110481
 ] 

james chien commented on KAFKA-5684:


[~guozhang] I agree with your point, we must to check K/V are byte[] if mapper 
is not provided(that means we do not need to execute 
{{KStreamPrint#maybeDeserialize}} .

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5694) Add ChangeReplicaDirRequest and DescribeReplicaDirRequest (KIP-113)

2017-08-02 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-5694:
---

 Summary: Add ChangeReplicaDirRequest and DescribeReplicaDirRequest 
(KIP-113)
 Key: KAFKA-5694
 URL: https://issues.apache.org/jira/browse/KAFKA-5694
 Project: Kafka
  Issue Type: New Feature
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110636#comment-16110636
 ] 

Paolo Patierno commented on KAFKA-5684:
---

So it seems to me that the serdes parameters aren't related to the processor 
node anymore (the {{KStreamPrintProcessor}} in this case) but are related to 
the mapper.
Referring on the two above cases from the {{print}} method point of view :

a) the mapper is just provided to the {{PrintForeachAction}} and serdes aren 
ignored
b) we have to build a mapper which use the provided serdes (or default ones) 
and then passing it to {{PrintForeachAction}}. Today there is a 
{{defaultKeyValueMapper}} but it doesn't fit well in this case because it's not 
aware of serdes

At same time if serdes parameters are not related to the processor node, the 
{{KStreamPrintProcessor}} is really like the {{KStreamPeekProcessor}} with 
"forwardDownStream = false".
The only difference is on the {{close}} method which is needed to flush the 
{{PrintForeachAction}}.
In any case I think that {{KStreamPrintProcessor}} can be refactored as derived 
from {{KStreamPeekProcessor}}.

What do you think [~guozhang] [~james.c] ?
I'm starting to develop a proposal PR for that.

Thanks,
Paolo.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5358) Consumer perf tool should count rebalance time separately

2017-08-02 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110630#comment-16110630
 ] 

huxihx commented on KAFKA-5358:
---

[~hachikuji] How is the status for this KIP? Is it approved?

> Consumer perf tool should count rebalance time separately
> -
>
> Key: KAFKA-5358
> URL: https://issues.apache.org/jira/browse/KAFKA-5358
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: huxihx
>
> It would be helpful to measure rebalance time separately in the performance 
> tool so that throughput between different versions can be compared more 
> easily in spite of improvements such as 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance.
>  At the moment, running the perf tool on 0.11.0 or trunk for a short amount 
> of time will present a severely skewed picture since the overall time will be 
> dominated by the join group delay.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5152) Kafka Streams keeps restoring state after shutdown is initiated during startup

2017-08-02 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy reassigned KAFKA-5152:
-

Assignee: Damian Guy  (was: Matthias J. Sax)

> Kafka Streams keeps restoring state after shutdown is initiated during startup
> --
>
> Key: KAFKA-5152
> URL: https://issues.apache.org/jira/browse/KAFKA-5152
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Xavier Léauté
>Assignee: Damian Guy
>Priority: Blocker
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> If streams shutdown is initiated during state restore (e.g. an uncaught 
> exception is thrown) streams will not shut down until all stores are first 
> finished restoring.
> As restore progresses, stream threads appear to be taken out of service as 
> part of the shutdown sequence, causing rebalancing of tasks. This compounds 
> the problem by slowing down the restore process even further, since the 
> remaining threads now have to also restore the reassigned tasks before they 
> can shut down.
> A more severe issue is that if there is a new rebalance triggered during the 
> end of the waitingSync phase (e.g. due to a new member joining the group, or 
> some members timed out the SyncGroup response), then some consumer clients of 
> the group may already proceed with the {{onPartitionsAssigned}} and blocked 
> on trying to grab the file dir lock not yet released from other clients, 
> while the other clients holding the lock are consistently re-sending 
> {{JoinGroup}} requests while the rebalance cannot be completed because the 
> clients blocked on the file dir lock will not be kicked out of the group as 
> its heartbeat thread has been consistently sending HBRequest. Hence this is a 
> deadlock caused by not releasing the file dir locks in task suspension.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5459) Support kafka-console-producer.sh messages as whole file

2017-08-02 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110657#comment-16110657
 ] 

huxihx commented on KAFKA-5459:
---

[~tombentley] How do we specify the key if the entire file content is the 
message value?

> Support kafka-console-producer.sh messages as whole file
> 
>
> Key: KAFKA-5459
> URL: https://issues.apache.org/jira/browse/KAFKA-5459
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.10.2.1
>Reporter: Tom Bentley
>Priority: Trivial
>
> {{kafka-console-producer.sh}} treats each line read as a separate message. 
> This can be controlled using the {{--line-reader}} option and the 
> corresponding {{MessageReader}} trait. It would be useful to have built-in 
> support for sending the whole input stream/file as the message. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-08-02 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110568#comment-16110568
 ] 

huxihx commented on KAFKA-5641:
---

Closed this jira since this improvement does not  have much positive impact 
based on the comments from [~ijuma] and [~becket_qin].

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-08-02 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-5641.
---
Resolution: Not A Problem

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-2360) The kafka-consumer-perf-test.sh script help information print useless parameters.

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112108#comment-16112108
 ] 

ASF GitHub Bot commented on KAFKA-2360:
---

GitHub user huxihx opened a pull request:

https://github.com/apache/kafka/pull/3613

KAFKA-2360: Extract producer-specific configs out of the common PerfConfig

Separate `batch.size`, `message-size` and `compression-code` from 
PerfConfig to a newly-created ProducerPerfConfig in order to hide them in 
ConsumerPerf tool.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huxihx/kafka KAFKA-2360

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3613.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3613


commit c33b5d21cce8c48c95e6ea94833ceb69c74cd7e2
Author: huxihx 
Date:   2017-08-03T03:10:41Z

KAFKA-2360: Separate `batch.size`, `message-size` and `compression-code` 
from PerfConfig to a newly-created ProducerPerfConfig in order to hide them in 
ConsumerPerf tool.




> The kafka-consumer-perf-test.sh script help information print useless 
> parameters.
> -
>
> Key: KAFKA-2360
> URL: https://issues.apache.org/jira/browse/KAFKA-2360
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.2.1
> Environment: Linux
>Reporter: Bo Wang
>Assignee: jin xing
>Priority: Minor
>  Labels: newbie
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Run kafka-consumer-perf-test.sh --help to show help information,  but found 3 
> parameters useless : 
> --batch-size and --batch-size --messages
> That is producer of parameters.
> bin]# ./kafka-consumer-perf-test.sh --help
> Missing required argument "[topic]"
> Option  Description   
>  
> --  ---   
>  
> --batch-size Number of messages to write in a  
>  
>   single batch. (default: 200)
>  
> --compression-codec   
>   supported codec: NoCompressionCodec (default: 0)
>  
>   as 0, GZIPCompressionCodec as 1,
>  
>   SnappyCompressionCodec as 2,
>  
>   LZ4CompressionCodec as 3>   
>  
> --date-format  The date format to use for formatting 
>  
>   the time field. See java.text.  
>  
>   SimpleDateFormat for options.   
>  
>   (default: -MM-dd HH:mm:ss:SSS)  
>  
> --fetch-size The amount of data to fetch in a  
>  
>   single request. (default: 1048576)  
>  
> --messages The number of messages to send or 
>  
>   consume (default:   
>  
>   9223372036854775807)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-2360) The kafka-consumer-perf-test.sh script help information print useless parameters.

2017-08-02 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-2360:
-

Assignee: huxihx  (was: jin xing)

> The kafka-consumer-perf-test.sh script help information print useless 
> parameters.
> -
>
> Key: KAFKA-2360
> URL: https://issues.apache.org/jira/browse/KAFKA-2360
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.2.1
> Environment: Linux
>Reporter: Bo Wang
>Assignee: huxihx
>Priority: Minor
>  Labels: newbie
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Run kafka-consumer-perf-test.sh --help to show help information,  but found 3 
> parameters useless : 
> --batch-size and --batch-size --messages
> That is producer of parameters.
> bin]# ./kafka-consumer-perf-test.sh --help
> Missing required argument "[topic]"
> Option  Description   
>  
> --  ---   
>  
> --batch-size Number of messages to write in a  
>  
>   single batch. (default: 200)
>  
> --compression-codec   
>   supported codec: NoCompressionCodec (default: 0)
>  
>   as 0, GZIPCompressionCodec as 1,
>  
>   SnappyCompressionCodec as 2,
>  
>   LZ4CompressionCodec as 3>   
>  
> --date-format  The date format to use for formatting 
>  
>   the time field. See java.text.  
>  
>   SimpleDateFormat for options.   
>  
>   (default: -MM-dd HH:mm:ss:SSS)  
>  
> --fetch-size The amount of data to fetch in a  
>  
>   single request. (default: 1048576)  
>  
> --messages The number of messages to send or 
>  
>   consume (default:   
>  
>   9223372036854775807)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:35 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
  1 I think producer doesn't try on  request timeout:
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
  2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bg."if there are some partition leader did not change in this request but 
not have enough replica, then it will not satisfy such code as below."
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
  1 I think producer doesn't try on  request timeout:
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
  2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
"if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:40 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
 # Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
# 1 I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
# 2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:41 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, but I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.

 # Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need to face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, but I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
 # Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:39 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
# 1 I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
# 2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
  1 I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
  2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:40 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, but I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
 # Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
 # Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-08-02 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112129#comment-16112129
 ] 

Apurva Mehta commented on KAFKA-5621:
-

[~sutambe] : one clarification. my proposal for `max.message.delivery.wait.ms` 
means just that. If a message has not been sent after this time, error it out. 
So if the `send` happened at time `T`, then the message would be expired if `T 
+ max.message.delivery.wait.ms` has elapsed and the message has still not been 
successfully acknowledged. In a sense, this would make setting `retries` a bit 
redundant unless you set `retries=0`. I think this is more intuitive than a 
`batch.expiry.ms`. For instance, would the the clock for batch expiry be reset 
every time the batch is requeued after failure?

Also [~ijuma]`s point is that the `max.block.wait.ms` is the upper bound on how 
long the producer will be stuck trying to send a batch anyway once the buffers 
are full, so there would never be a truly infinite wait. 

AT this point I think a config like `max.message.delivery.wait.ms` is a decent 
way to address multiple use cases. As I said before, I think we should open 
that discussion and try to converge on the details on the mailing list. 

[~sutambe] are you planning on doing that?

> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
> Fix For: 1.0.0
>
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:43 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, but I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.

# Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need to face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}
So I think this issue also exists in the scenario of merely leader migration, 
right?
Please let me know if I have any mistake, thanks.


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, but I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.

 # Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need to face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}
So I think this issue also exists in the scenario of merely leader migration, 
right?
Please let me know if I have any mistake, thanks.

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:44 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, but I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
# Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need to face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}
So I think this issue also exists in the scenario of merely leader migration, 
right?
Please let me know if I have any mistake, thanks.


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, but I have two confusions to your 
explanation:
# I think producer doesn't try on  request timeout:
{code:java}
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
{code}
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.

# Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need to face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
{code:java}
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
{code}
So I think this issue also exists in the scenario of merely leader migration, 
right?
Please let me know if I have any mistake, thanks.

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang commented on KAFKA-5678:


[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
  1 I think producer doesn't try on  request timeout:
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
  2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
"if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:36 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
  1 I think producer doesn't try on  request timeout:
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
  2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bg."if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
  1 I think producer doesn't try on  request timeout:
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
  2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bg."if there are some partition leader did not change in this request but 
not have enough replica, then it will not satisfy such code as below."
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5678) When the broker graceful shutdown occurs, the producer side sends timeout.

2017-08-02 Thread cuiyang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112124#comment-16112124
 ] 

cuiyang edited comment on KAFKA-5678 at 8/3/17 3:36 AM:


[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
  1 I think producer doesn't try on  request timeout:
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
  2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bq. "if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()


was (Author: cuiyang):
[~becket_qin]
Hi Jiang, thank you for your patience, I have two confusions to your 
explanation:
  1 I think producer doesn't try on  request timeout:
RecordAccumulator::abortExpiredBatches()
-> batch.expirationDone(); 
-> this.done()
-> thunk.callback.onCompletion()
-> this.userCallback.onCompletion() //will remove the 
batch from queue and invoke the user callback without any retries.
 So this is why we so care about the request timeout rather than other 
errors, because retries doesn't work for it even we set retries to non-zero.
  2 Even if  we only make leader migration without a broker going down(assuming 
that we migrate all the leaders from Broker A to other Broker B, C,D),  we 
still need face the scenario which is described by [~Json Tu]:
bg."if there are some partition leader did not change in this request but not 
have enough replica, then it will not satisfy such code as below."
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()

> When the broker graceful shutdown occurs, the producer side sends timeout.
> --
>
> Key: KAFKA-5678
> URL: https://issues.apache.org/jira/browse/KAFKA-5678
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0
>Reporter: tuyang
>
> Test environment as follows.
> 1.Kafka version:0.9.0.1
> 2.Cluster with 3 broker which with broker id A,B,C 
> 3.Topic with 6 partitions with 2 replicas,with 2 leader partitions at each 
> broker.
> We can reproduce the problem as follows.
> 1.we send message as quickly as possible with ack -1.
> 2.if partition p0's leader is on broker A and we graceful shutdown broker 
> A,but we send a message to p0 before the leader is reelect, so the message 
> can be appended to the leader replica successful, but if the follower replica 
> not catch it as quickly as possible, so the shutting down broker will create 
> a delayProduce for this request to wait complete until request.timeout.ms .
> 3.because of the controllerShutdown request from broker A, then the p0 
> partition leader will reelect
> , then the replica on broker A will become follower before complete shut 
> down.then the delayProduce will not be trigger to complete until expire. 
> 4.if broker A shutdown cost too long, then the producer will get response 
> after request.timeout.ms, which results in increase the producer send latency 
> when we are restarting broker one by one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5695) Test DeleteRecordsRequest in AuthorizerIntegrationTest

2017-08-02 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-5695:
---

 Summary: Test DeleteRecordsRequest in AuthorizerIntegrationTest
 Key: KAFKA-5695
 URL: https://issues.apache.org/jira/browse/KAFKA-5695
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5695) Test DeleteRecordsRequest in AuthorizerIntegrationTest

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112190#comment-16112190
 ] 

ASF GitHub Bot commented on KAFKA-5695:
---

GitHub user lindong28 opened a pull request:

https://github.com/apache/kafka/pull/3614

KAFKA-5695; Test DeleteRecordsRequest in AuthorizerIntegrationTest



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lindong28/kafka KAFKA-5695

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3614


commit 0abbf513b68e08613d14e6a1479fbe7dc7d372ec
Author: Dong Lin 
Date:   2017-08-03T04:54:52Z

KAFKA-5695; Test DeleteRecordsRequest in AuthorizerIntegrationTest




> Test DeleteRecordsRequest in AuthorizerIntegrationTest
> --
>
> Key: KAFKA-5695
> URL: https://issues.apache.org/jira/browse/KAFKA-5695
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110849#comment-16110849
 ] 

Paolo Patierno commented on KAFKA-5684:
---

Maybe I made a mistake here because the default serdes are available only from 
the {{ProcessorContext}} (using the Streams configuration). The same is for the 
topic which is needed for deserialization. So in any case the checks for a) and 
b) are something that should happen inside the processor node.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5096) Only log invalid user configs and overwrite with correct one

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110866#comment-16110866
 ] 

ASF GitHub Bot commented on KAFKA-5096:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2990


> Only log invalid user configs and overwrite with correct one
> 
>
> Key: KAFKA-5096
> URL: https://issues.apache.org/jira/browse/KAFKA-5096
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Mariam John
>Priority: Minor
>  Labels: beginner, newbie
> Fix For: 1.0.0
>
>
> Streams does not allow to overwrite some config parameters (eg, 
> {{enable.auto.commit}}) Currently, we throw an exception, but this is 
> actually not required, as Streams can just ignore/overwrite the user provided 
> value.
> Thus, instead of throwing, we should just log a WARN message and overwrite 
> the config with the values that suits Streams. (atm it's only one parameter 
> {{enable.auto.commit}}), but with exactly-once it's going to be more (cf. 
> KAFKA-4923). Thus, the scope of this ticket depends when it will be 
> implemented (ie, before or after KAFKA-4923).
> This ticket should also include JavaDoc updates that explain what parameters 
> cannot be specified by the user.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110849#comment-16110849
 ] 

Paolo Patierno edited comment on KAFKA-5684 at 8/2/17 2:17 PM:
---

Maybe I made a mistake here because the default serdes are available only from 
the {{ProcessorContext}} (using the Streams configuration). The same is for the 
topic which is needed for deserialization. So in any case the checks for a) and 
b) are something that should happen inside the processor node at runtime during 
the flow. 
My first idea was creating a mapper (if one is not provided) with the right 
serdes (from parameters or default) upfront and passing it to the processor 
node creation doing the deserialization inside such mapper. 


was (Author: ppatierno):
Maybe I made a mistake here because the default serdes are available only from 
the {{ProcessorContext}} (using the Streams configuration). The same is for the 
topic which is needed for deserialization. So in any case the checks for a) and 
b) are something that should happen inside the processor node.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient

2017-08-02 Thread Tom Bentley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111088#comment-16111088
 ] 

Tom Bentley commented on KAFKA-5692:


This is now covered by 
[KIP-183|https://cwiki.apache.org/confluence/display/KAFKA/KIP-183+-+Change+PreferredReplicaLeaderElectionCommand+to+use+AdminClient]

> Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
> -
>
> Key: KAFKA-5692
> URL: https://issues.apache.org/jira/browse/KAFKA-5692
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: needs-kip
>
> The PreferredReplicaLeaderElectionCommand currently uses a direction 
> connection to zookeeper. The zookeeper dependency should be deprecated and an 
> AdminClient API created to be used instead. 
> This change will require a KIP.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient

2017-08-02 Thread Tom Bentley (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley updated KAFKA-5692:
---
Description: 
The PreferredReplicaLeaderElectionCommand currently uses a direct connection to 
zookeeper. The zookeeper dependency should be deprecated and an AdminClient API 
created to be used instead. 

This change will require a KIP.

  was:
The PreferredReplicaLeaderElectionCommand currently uses a direction connection 
to zookeeper. The zookeeper dependency should be deprecated and an AdminClient 
API created to be used instead. 

This change will require a KIP.


> Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
> -
>
> Key: KAFKA-5692
> URL: https://issues.apache.org/jira/browse/KAFKA-5692
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: needs-kip
>
> The PreferredReplicaLeaderElectionCommand currently uses a direct connection 
> to zookeeper. The zookeeper dependency should be deprecated and an 
> AdminClient API created to be used instead. 
> This change will require a KIP.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5674) max.connections.per.ip minimum value to be zero to allow IP address blocking

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110957#comment-16110957
 ] 

ASF GitHub Bot commented on KAFKA-5674:
---

GitHub user viktorsomogyi opened a pull request:

https://github.com/apache/kafka/pull/3610

KAFKA-5674: max.connections.per.ip minimum should be 0



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viktorsomogyi/kafka KAFKA-5674

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3610


commit a8dee0b78367984dfda9ef125b0ca37c2e1c73dd
Author: Viktor Somogyi 
Date:   2017-08-02T12:22:45Z

KAFKA-5674: max.connections.per.ip minimum should be 0




> max.connections.per.ip minimum value to be zero to allow IP address blocking
> 
>
> Key: KAFKA-5674
> URL: https://issues.apache.org/jira/browse/KAFKA-5674
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.11.0.0
>Reporter: Tristan Stevens
>Assignee: Viktor Somogyi
>
> Currently the max.connections.per.ip (KAFKA-1512) config has a minimum value 
> of 1, however, as suggested in 
> https://issues.apache.org/jira/browse/KAFKA-1512?focusedCommentId=14051914, 
> having this with a minimum value of zero would allow IP-based filtering of 
> inbound connections (effectively prohibit those IP addresses from connecting 
> altogether).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-2526) Console Producer / Consumer's serde config is not working

2017-08-02 Thread Aish Raj Dahal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111024#comment-16111024
 ] 

Aish Raj Dahal commented on KAFKA-2526:
---

[~ewencp] Apologies for the naivety, but I was thinking along the lines 
throwing an error as well. Additionally, like mentioned in the original issue 
description as well as one of your earlier comments, creating a KIP for 
cleaning up the configs in the console producer and the console consumer is 
what I was planning along. As a user of the command line tools, having a 
simpler interface to passing the configuration parameters and getting the right 
error messages for unexpected behavior would surely make my day :)
Let me know how this sounds to you. Since I'm fairly new to the community, 
perhaps out of nativity I am  underestimating the effort involved, particularly 
for the later of of the task i.e., in getting the KIP approved and working on. 
Suggestions/tips around this are welcome!

> Console Producer / Consumer's serde config is not working
> -
>
> Key: KAFKA-2526
> URL: https://issues.apache.org/jira/browse/KAFKA-2526
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
>  Labels: newbie
>
> Although in the console producer one can specify the key value serializer, 
> they are actually not used since 1) it always serialize the input string as 
> String.getBytes (hence always pre-assume the string serializer) and 2) it is 
> actually only passed into the old producer. The same issues exist in console 
> consumer.
> In addition the configs in the console producer is messy: we have 1) some 
> config values exposed as cmd parameters, and 2) some config values in 
> --producer-property and 3) some in --property.
> It will be great to clean the configs up in both console producer and 
> consumer, and put them into a single --property parameter which could 
> possibly take a file to reading in property values as well, and only leave 
> --new-producer as the other command line parameter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5693) TopicCreationPolicy and AlterConfigsPolicy overlap

2017-08-02 Thread Tom Bentley (JIRA)
Tom Bentley created KAFKA-5693:
--

 Summary: TopicCreationPolicy and AlterConfigsPolicy overlap
 Key: KAFKA-5693
 URL: https://issues.apache.org/jira/browse/KAFKA-5693
 Project: Kafka
  Issue Type: Bug
Reporter: Tom Bentley
Priority: Minor


The administrator of a cluster can configure a {{CreateTopicPolicy}}, which has 
access to the topic configs as well as other metadata to make its decision 
about whether a topic creation is allowed. Thus in theory the decision could be 
based on a combination of of the replication factor, and the topic configs, for 
example. 

Separately there is an AlterConfigPolicy, which only has access to the configs 
(and can apply to configurable entities other than just topics).

There are potential issues with this. For example although the 
CreateTopicPolicy is checked at creation time, it's not checked for any later 
alterations to the topic config. So policies which depend on both the topic 
configs and other topic metadata could be worked around by changing the configs 
after creation.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5693) TopicCreationPolicy and AlterConfigsPolicy overlap

2017-08-02 Thread Tom Bentley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111313#comment-16111313
 ] 

Tom Bentley commented on KAFKA-5693:


One (obvious) part solution to this is that the {{CreateTopicPolicy}} should be 
applied not just at topic creation, but also for every topic modification, 
whether it's a change to the topic configs or something else. Unless there's a 
valid use case for configuring changed topics differently to freshly created 
topics?

By symmetry, unless it's OK for a noop topic config change to be rejected by 
the {{AlterConfigPolicy}}, maybe the topic config supplied during topic 
creation should also be run through the {{AlterConfigPolicy}}. Again, maybe 
there are valid use cases for allowing a topic config at creation which would 
be disallowed in a later modification, but I can't think of any.



> TopicCreationPolicy and AlterConfigsPolicy overlap
> --
>
> Key: KAFKA-5693
> URL: https://issues.apache.org/jira/browse/KAFKA-5693
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Priority: Minor
>
> The administrator of a cluster can configure a {{CreateTopicPolicy}}, which 
> has access to the topic configs as well as other metadata to make its 
> decision about whether a topic creation is allowed. Thus in theory the 
> decision could be based on a combination of of the replication factor, and 
> the topic configs, for example. 
> Separately there is an AlterConfigPolicy, which only has access to the 
> configs (and can apply to configurable entities other than just topics).
> There are potential issues with this. For example although the 
> CreateTopicPolicy is checked at creation time, it's not checked for any later 
> alterations to the topic config. So policies which depend on both the topic 
> configs and other topic metadata could be worked around by changing the 
> configs after creation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3657) NewProducer NullPointerException on ProduceRequest

2017-08-02 Thread Shubham Rander (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111352#comment-16111352
 ] 

Shubham Rander commented on KAFKA-3657:
---

[~ijuma] We have seen the same issue intermittently with 0.9.x version.

Thanks

> NewProducer NullPointerException on ProduceRequest
> --
>
> Key: KAFKA-3657
> URL: https://issues.apache.org/jira/browse/KAFKA-3657
> Project: Kafka
>  Issue Type: Bug
>  Components: network, producer 
>Affects Versions: 0.8.2.1
> Environment: linux 3.2.0 debian7
>Reporter: Vamsi Subhash Achanta
>Assignee: Jun Rao
>
> The producer upon send.get() on the future appends to the accumulator the 
> record batches and the Sender.java (separate thread) flushes it to the 
> server. The produce request waits on the countDownLatch in the 
> FutureRecordMetadata:
> public RecordMetadata get() throws InterruptedException, 
> ExecutionException {
> this.result.await();
> In this case, the client thread is blocked for ever (as it is get() without 
> timeout) for the response and the response upon poll by the Sender returns an 
> attachment with the batch value as null. The batch is processed and the 
> request is errored out. The Sender catches a global level exception and then 
> goes ahead. As the accumulator is drained, the response will never be 
> returned and the producer client thread calling get() is blocked for ever on 
> the latch await call.
> I checked at the server end but still haven't found the reason for null 
> batch. Any pointers on this?
> ERROR [2016-05-01 21:00:09,256] [kafka-producer-network-thread |producer-app] 
> [Sender] message_id: group_id: : Uncaught error in kafka producer I/O thread:
> ! java.lang.NullPointerException: null
> ! at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:266)
> ! at 
> org.apache.kafka.clients.producer.internals.Sender.handleResponse(Sender.java:236)
> ! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:196)
> ! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> ! at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-08-02 Thread Sumant Tambe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111237#comment-16111237
 ] 

Sumant Tambe edited comment on KAFKA-5621 at 8/2/17 4:38 PM:
-

Wait, what? blocking indefinitely? where? No No.

On one hand, I agree with [~apurva]'s point about no exposing the batching 
aspects in even more configs,
 {{message.max.delivery.wait.ms}} could be confusing because it would be 
interpreted as an end-to-end timeout. From batch-ready to broker receiving it. 
Or worse, from calling send to broker receiving it. An abstract name like that 
is open to interpretation. Therefore, I personally prefer {{batch.expiry.ms}} 
as it does what it says. It expires batches (in the accumulator). IMO, batching 
a well-known concept in big-data infrastructure: Kafka, Spark to name a few. 

So lets assume we chose a name X for the accumulator timeout. Now the question 
is what should be the defaults for it. We can either 1. support 
backwards-compatible behavior by setting it X=request.timeout.ms or 2. Some 
other agreed upon value (e.g., retries * request.timeout.ms) or 3. MAX_LONG in 
the interest of letting Kafka do the work to the maximum possible extent.

I'm ok with 1 and 2. The concern I've about #3 is that "maximum extent" is not 
forever. It's literally an extreme. 

Following is an attempt to answer [~ijuma]'s question.

There are at-least three classes of application I consider when I think about 
producer timeouts. 1. real-time apps (periodic healthcheck producer, 
temperature sensor producer) 2. fail-fast (e.g., KMM) 3. Best Effort (I.e., 
everything else. Further sub-categorization possible here). 

A real-time app  has a soft upper bound on *both* message delivery and failure 
of message delivery. In both cases, it wants to know. Such as app does *not* 
close the producer on the first error because there's more data lined up right 
behind. It's ok to lose a few samples of temperature. So it simply drops it and 
moves on. May be when drop rate is like 70% it would close it. May use acks=0. 
In this case, X=MAX_LONG is not suitable.

A fail-fast app does not have a upper bound on when the data gets to the 
broker. It's ready to wait for a long time to give every message a chance. But, 
if it's not making progress on a partition, it needs to know asap. That's it 
has a bound on the failure notification. Such an app (IMO) would a. close the 
producer if ordering is important (that's how we run KMM in Linkedin) b. queue 
it at the back of the queue for a later attempt with obvious reordering. In 
both cases, X=MAX_LONG is not suitable.

Now comes Best Effort, it has no bounds on success or failure notification. 
Frankly, I don't know which app really fits this bill. I've heard about apps 
configuring retries=MAX_LONG. I'll just say that I agree with [~becket_qin]'s 
opinion on this that it's considered bad. 

+1 for announcing Kip-91. I'll try to capture the discussion in this thread in 
the KIP before announcing. Do you guys want me to do that? Or should I just 
link to this thread?


was (Author: sutambe):
Wait, what? blocking indefinitely? where? No No.

On one hand, I agree with [~apurva]'s point about no exposing the batching 
aspects in even more configs,
 {{message.max.delivery.wait.ms}} could be confusing because it would be 
interpreted as an end-to-end timeout. From batch-ready to broker receiving it. 
Or worse, from calling send to broker receiving it. An abstract name like that 
is open to interpretation. Therefore, I personally prefer {{batch.expiry.ms}} 
as it does what it says. It expires batches (in the accumulator). IMO, batching 
a well-known concept in big-data infrastructure: Kafka, Spark to name a few. 

So lets assume we chose a name X for the accumulator timeout. Now the question 
is what should be the defaults for it. We can either 1. support 
backwards-compatible behavior by setting it X=request.timeout.ms or 2. Some 
other agreed upon value (e.g., retries * request.timeout.ms) or 3. MAX_LONG in 
the interest of letting Kafka do the work to the maximum possible extent.

I'm ok with 1 and 2. The concern I've about #3 is that "maximum extent" is not 
forever. It's literally an extreme. 

Following is an attempt to answer [~ijuma]'s question.

There are at-least three classes of application I consider when I think about 
producer timeouts. 1. real-time apps (periodic healthcheck producer, 
temperature sensor producer) 2. fail-fast (e.g., KMM) 3. Best Effort (I.e., 
everything else. Further sub-categorization possible here). 

A real-time app  has a soft upper bound on *both* message delivery and failure 
of message delivery. In both cases, it wants to know. Such as app does *not* 
close the producer on the first error because there's more data lined up right 
behind. It's ok to lose a few samples of temperature. So it simply drops it and 
moves on. May be when drop 

[jira] [Assigned] (KAFKA-5474) Streams StandbyTask should no checkpoint on commit if EOS is enabled

2017-08-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-5474:


Assignee: Guozhang Wang  (was: Matthias J. Sax)

> Streams StandbyTask should no checkpoint on commit if EOS is enabled
> 
>
> Key: KAFKA-5474
> URL: https://issues.apache.org/jira/browse/KAFKA-5474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Blocker
>  Labels: exactly-once
>
> Discovered by system test {{streams_eos_test#test_failure_and_recovery}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5474) Streams StandbyTask should no checkpoint on commit if EOS is enabled

2017-08-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-5474:


Assignee: Matthias J. Sax  (was: Guozhang Wang)

> Streams StandbyTask should no checkpoint on commit if EOS is enabled
> 
>
> Key: KAFKA-5474
> URL: https://issues.apache.org/jira/browse/KAFKA-5474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: exactly-once
>
> Discovered by system test {{streams_eos_test#test_failure_and_recovery}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)