[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Anatoli Fomenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364205#comment-14364205
 ] 

Anatoli Fomenko commented on KAFKA-2023:


Works for me on MacOS but not on one CentOS 6.5.

Perhaps these words would work better?

If you see error: RPC failed; result=22, HTTP code = 405
use 
git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka



 git clone kafka repository requires https
 -

 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Priority: Minor
 Attachments: KAFKA-2023.patch


 From http://kafka.apache.org/code.html: 
 Our code is kept in git. You can check it out like this:
   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 On CentOS 6.5:
 {code}
 $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 error: RPC failed; result=22, HTTP code = 405
 {code}
 while:
 {code}
 $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 remote: Counting objects: 24607, done.
 remote: Compressing objects: 100% (9212/9212), done.
 remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
 Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
 Resolving deltas: 100% (14449/14449), done.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2024) Cleaner can generate unindexable log segments

2015-03-16 Thread Gian Merlino (JIRA)
Gian Merlino created KAFKA-2024:
---

 Summary: Cleaner can generate unindexable log segments
 Key: KAFKA-2024
 URL: https://issues.apache.org/jira/browse/KAFKA-2024
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Gian Merlino


It's possible for log cleaning to generate segments that have a gap of more 
than Int.MaxValue between their base offset and their last offset. It's not 
possible to index those segments since there's only 4 bytes available to store 
that difference. The broker will end up writing overflowed ints into the index, 
and doesn't detect that there is a problem until restarted, at which point you 
get one of these:

2015-03-16 20:35:49,632 FATAL [main] kafka.server.KafkaServerStartable - Fatal 
error during KafkaServerStartable startup. Prepare to shutdown
java.lang.IllegalArgumentException: requirement failed: Corrupt index found, 
index file (/mnt/persistent/kafka-logs/topic/.index) has 
non-zero size but the last offset is -1634293959 and the base offset is 0
at scala.Predef$.require(Predef.scala:233)
at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352)
at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:204)
at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:203)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.Log.loadSegments(Log.scala:203)
at kafka.log.Log.init(Log.scala:67)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Anatoli Fomenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364404#comment-14364404
 ] 

Anatoli Fomenko commented on KAFKA-2023:


Concur: the problem persists with git 1.7.1, and does not with later versions, 
such as 2.0.4.

The question is if the site supports git 1.7.1 that is a default CentOS 6 
version.

 git clone kafka repository requires https
 -

 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Priority: Minor
 Attachments: KAFKA-2023.patch


 From http://kafka.apache.org/code.html: 
 Our code is kept in git. You can check it out like this:
   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 On CentOS 6.5:
 {code}
 $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 error: RPC failed; result=22, HTTP code = 405
 {code}
 while:
 {code}
 $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 remote: Counting objects: 24607, done.
 remote: Compressing objects: 100% (9212/9212), done.
 remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
 Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
 Resolving deltas: 100% (14449/14449), done.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-2023:
-
Reviewer: Gwen Shapira

 git clone kafka repository requires https
 -

 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Assignee: Anatoly Fayngelerin
Priority: Minor
 Attachments: KAFKA-2023.patch


 From http://kafka.apache.org/code.html: 
 Our code is kept in git. You can check it out like this:
   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 On CentOS 6.5:
 {code}
 $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 error: RPC failed; result=22, HTTP code = 405
 {code}
 while:
 {code}
 $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 remote: Counting objects: 24607, done.
 remote: Compressing objects: 100% (9212/9212), done.
 remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
 Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
 Resolving deltas: 100% (14449/14449), done.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein reassigned KAFKA-2023:


Assignee: Anatoly Fayngelerin

 git clone kafka repository requires https
 -

 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Assignee: Anatoly Fayngelerin
Priority: Minor
 Attachments: KAFKA-2023.patch


 From http://kafka.apache.org/code.html: 
 Our code is kept in git. You can check it out like this:
   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 On CentOS 6.5:
 {code}
 $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 error: RPC failed; result=22, HTTP code = 405
 {code}
 while:
 {code}
 $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 remote: Counting objects: 24607, done.
 remote: Compressing objects: 100% (9212/9212), done.
 remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
 Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
 Resolving deltas: 100% (14449/14449), done.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-16 Thread Jun Rao
101. There may be a use case where you only want the topics to be created
manually by admins. Currently, you can do that by disabling auto topic
creation and issue topic creation from the TopicCommand. If we disable auto
topic creation completely on the broker and don't have a way to distinguish
between topic creation requests from the regular clients and the admin, we
can't support manual topic creation any more. I was thinking that another
way of distinguishing the clients making the topic creation requests is
using clientId. For example, the admin tool can set it to something like
admin and the broker can treat that clientId specially.

Also, there is a related discussion in KAFKA-2020. Currently, we do the
following in TopicMetadataResponse:

1. If leader is not available, we set the partition level error code to
LeaderNotAvailable.
2. If a non-leader replica is not available, we take that replica out of
the assigned replica list and isr in the response. As an indication for
doing that, we set the partition level error code to ReplicaNotAvailable.

This has a few problems. First, ReplicaNotAvailable probably shouldn't be
an error, at least for the normal producer/consumer clients that just want
to find out the leader. Second, it can happen that both the leader and
another replica are not available at the same time. There is no error code
to indicate both. Third, even if a replica is not available, it's still
useful to return its replica id since some clients (e.g. admin tool) may
still make use of it.

One way to address this issue is to always return the replica id for
leader, assigned replicas, and isr regardless of whether the corresponding
broker is live or not. Since we also return the list of live brokers, the
client can figure out whether a leader or a replica is live or not and act
accordingly. This way, we don't need to set the partition level error code
when the leader or a replica is not available. This doesn't change the wire
protocol, but does change the semantics. Since we are evolving the protocol
of TopicMetadataRequest here, we can potentially piggyback the change.

102.1 For those types of errors due to invalid input, shouldn't we just
guard it at parameter validation time and throw InvalidArgumentException
without even sending the request to the broker?

Thanks,

Jun


On Mon, Mar 16, 2015 at 10:37 AM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Jun,

 Answering your questions:

 101. If I understand you correctly, you are saying future producer versions
 (which
 will be ported to TMR_V1) won't be able to automatically create topic (if
 we
 unconditionally remove topic creation from there). But we need to this
 preserve logic.
 Ok, about your proposal: I'm not a big fan too, when it comes to
 differentiating
 clients directly in protocol schema. And also I'm not sure I understand at
 all why
 auto.create.topics.enable is a server side configuration. Can we deprecate
 this setting
 in future versions, add this setting to producer and based on that upon
 receiving
 UnknownTopic create topic explicitly by a separate producer call via
 adminClient?

 102.1. Hm, yes. It's because we want to support batching and at the same
 time we
 want to give descriptive error messages for clients. Since AdminClient
 holds the context
 to construct such messages (e.g. AdminClient layer can know that
 InvalidArgumentsCode
 means two cases: either invalid number - e.g. -1; or replication-factor was
 provided while
 partitions argument wasn't) - I wrapped responses in Exceptions. But I'm
 open to any
 other ideas, this was just initial version.
 102.2. Yes, I agree. I'll change that to probably some other dto.

 Thanks,
 Andrii Biletskyi

 On Fri, Mar 13, 2015 at 7:16 PM, Jun Rao j...@confluent.io wrote:

  Andrii,
 
  101. That's what I was thinking too, but it may not be that simple. In
  TopicMetadataRequest_V1,
  we can let it not trigger auto topic creation. Then, in the producer
 side,
  if it gets an UnknownTopicException, it can explicitly issue a
  createTopicRequest for auto topic creation. On the consumer side, it will
  never issue createTopicRequest. This works when auto topic creation is
  enabled on the broker side. However, I am not sure how things will work
  when auto topic creation is disabled on the broker side. In this case, we
  want to have a way to manually create a topic, potentially through admin
  commands. However, then we need a way to distinguish createTopicRequest
  issued from the producer clients and the admin tools. May be we can add a
  new field in createTopicRequest and set it differently in the producer
  client and the admin client. However, I am not sure if that's the best
  approach.
 
  2. Yes, refactoring existing requests is a non-trivial amount of work. I
  posted some comments in KAFKA-1927. We will probably have to fix
 KAFKA-1927
  first, before adding the new logic in KAFKA-1694. Otherwise, the changes
  will be too big.
 
  102. About the 

[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-2013:

Attachment: KAFKA-2013_2015-03-16_14:13:20.patch

 benchmark test for the purgatory
 

 Key: KAFKA-2013
 URL: https://issues.apache.org/jira/browse/KAFKA-2013
 Project: Kafka
  Issue Type: Test
  Components: purgatory
Reporter: Yasuhiro Matsuda
Assignee: Yasuhiro Matsuda
Priority: Trivial
 Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, 
 KAFKA-2013_2015-03-16_14:13:20.patch


 We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Aditya Auradkar
In Jay's approach, a client will simply experience a delay in receiving a 
response. The primary benefit is that there are no concerns regarding data-loss 
because the data has already been appended. Retries are also a non-issue since 
there is no need for them. However, the drawback to append and delay is that if 
the socket timeout is reached (30 second default I believe), the client can 
disconnect and try to resend the batch to the server. This will cause data 
duplication since the server cannot distinguish duplicate batches. However, it 
is very likely that the maximum quota delay will be lower than the socket 
timeout unless someone explicitly overrides it. We can make this even more 
unlikely by having a fixed lower bound on the socket timeout (10 seconds?). In 
this approach we must also ignore the request timeout since a small timeout 
will completely bypass quotas.

In the other approach, assuming the client only retries a fixed number of 
times, it will eventually experience data loss since the producer will drop the 
batch at some point. IMO, it is more likely that we will see this issue in 
production than the other issues identified above.

I agree with Jay that we can delay the request longer than the request timeout 
since it isn't possible to enforce perfectly on the server anyway. I think that 
we should have a maximum delay config on the server that provides a ceiling on 
the most time we can delay a request and have it be lower than the socket 
timeout. 

Initially, I preferred delay and error because it seems like the most natural 
way to handle quota violations.. but I'm starting to see the merit in Jay's 
approach. Practically speaking, it reduces the number of moving parts in 
delivering quotas for Kafka. All changes are localized to the broker and is 
compatible with existing clients. Client changes will be required only if we 
return quota metadata in the responses or add a quota metadata API.
If we discover in production that this isn't working for some reason.. we can 
always revisit this approach of returning errors and having the clients handle 
them.

Note that both these data loss/duplicate issues only affect the producer. 
Consumers should be fine regardless of the approach we choose.

Aditya

From: Jun Rao [j...@confluent.io]
Sent: Monday, March 16, 2015 4:27 PM
To: dev@kafka.apache.org
Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

It's probably useful for a client to know whether its requests are
throttled or not (e.g., for monitoring and alerting). From that
perspective, option B (delay the requests and return an error) seems better.

Thanks,

Jun

On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Posted a KIP for quotas in kafka.
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas

 Appreciate any feedback.

 Aditya



[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-03-16 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364344#comment-14364344
 ] 

Sriharsha Chintalapani commented on KAFKA-1646:
---

[~waldenchen] Its looks like the patch is against 0.8.1.1 branch can you send 
us a patch against trunk. 


 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, 
 KAFKA-1646_20150312_200352.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364263#comment-14364263
 ] 

Joe Stein commented on KAFKA-2023:
--

looks like maybe the issue is the version of git, i tried a few other asf repos 
same issue with git 1.7.1 what comes with yum install git

 git clone kafka repository requires https
 -

 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Priority: Minor
 Attachments: KAFKA-2023.patch


 From http://kafka.apache.org/code.html: 
 Our code is kept in git. You can check it out like this:
   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 On CentOS 6.5:
 {code}
 $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 error: RPC failed; result=22, HTTP code = 405
 {code}
 while:
 {code}
 $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 remote: Counting objects: 24607, done.
 remote: Compressing objects: 100% (9212/9212), done.
 remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
 Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
 Resolving deltas: 100% (14449/14449), done.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29091: Improve 1646 fix by reduce check if Os.IsWindows

2015-03-16 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29091/#review76688
---



core/src/main/scala/kafka/utils/Utils.scala
https://reviews.apache.org/r/29091/#comment124320

can we do setLength for all os  not just specific to windows?


- Sriharsha Chintalapani


On March 13, 2015, 3:12 a.m., Qianlin Xia wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29091/
 ---
 
 (Updated March 13, 2015, 3:12 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1646
 https://issues.apache.org/jira/browse/KAFKA-1646
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Improve 1646 fix by reduce check if Os.IsWindows
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/FileMessageSet.scala 
 e1f8b979c3e6f62ea235bd47bc1587a1291443f9 
   core/src/main/scala/kafka/log/Log.scala 
 46df8d99d977a3b010a9b9f4698187fa9bfb2498 
   core/src/main/scala/kafka/log/LogManager.scala 
 7cee5435b23fcd0d76f531004911a2ca499df4f8 
   core/src/main/scala/kafka/log/LogSegment.scala 
 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
   core/src/main/scala/kafka/utils/Utils.scala 
 a89b0463685e6224d263bc9177075e1bb6b93d04 
 
 Diff: https://reviews.apache.org/r/29091/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Qianlin Xia
 




[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364412#comment-14364412
 ] 

Gwen Shapira commented on KAFKA-2023:
-

If its the default on a popular OS, I'd update the docs with this information.

 git clone kafka repository requires https
 -

 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Priority: Minor
 Attachments: KAFKA-2023.patch


 From http://kafka.apache.org/code.html: 
 Our code is kept in git. You can check it out like this:
   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 On CentOS 6.5:
 {code}
 $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 error: RPC failed; result=22, HTTP code = 405
 {code}
 while:
 {code}
 $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 remote: Counting objects: 24607, done.
 remote: Compressing objects: 100% (9212/9212), done.
 remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
 Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
 Resolving deltas: 100% (14449/14449), done.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda


 On March 16, 2015, 5:17 p.m., Jun Rao wrote:
  core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala, line 193
  https://reviews.apache.org/r/31893/diff/1/?file=890190#file890190line193
 
  Is there a particular reason that we need to overwrite isCompleted()? 
  Typically, only tryComplete() and onComplete() need to be overwritten in a 
  subclass of DelayedOperation.
  
  Actually, I am not sure how we complete the requests before the timeout 
  is reached since there is no explict call for tryComplete()?

isCompleted checks if the current time has passed the schedule completion time 
rather than if forceComplete has been called. It makes isCompleted always 
accurate.

Purgatory checks watcher lists every so often and calls isCompleted. Calling 
forceComplete from isCompeleted ensures that a completed request is removed 
from the timing wheels in the new implementation. In terms of timing, this is 
not very accurate because completed requests may stay longer then they should 
be. This doesn't affect the old implementaion at all, but it may impose some 
overheads on the new implementaion. Still, the new one outperforms the old one.

It is ideal if we can call call forceComplete on scheduled completion time. It 
requires another timer (DelayQueue or Timer) for that. I think it is too much 
overhead to measure purgatory performace. And also it is hard to guarantee such 
a timer works accurately in this test setting.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/#review76566
---


On March 16, 2015, 8:23 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31893/
 ---
 
 (Updated March 16, 2015, 8:23 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2013
 https://issues.apache.org/jira/browse/KAFKA-2013
 
 
 Repository: kafka
 
 
 Description
 ---
 
 purgatory micro benchmark
 
 
 Diffs
 -
 
   core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31893/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/
---

(Updated March 16, 2015, 8:23 p.m.)


Review request for kafka.


Bugs: KAFKA-2013
https://issues.apache.org/jira/browse/KAFKA-2013


Repository: kafka


Description
---

purgatory micro benchmark


Diffs (updated)
-

  core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31893/diff/


Testing
---


Thanks,

Yasuhiro Matsuda



[jira] [Commented] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363843#comment-14363843
 ] 

Yasuhiro Matsuda commented on KAFKA-2013:
-

Updated reviewboard https://reviews.apache.org/r/31893/diff/
 against branch origin/trunk

 benchmark test for the purgatory
 

 Key: KAFKA-2013
 URL: https://issues.apache.org/jira/browse/KAFKA-2013
 Project: Kafka
  Issue Type: Test
  Components: purgatory
Reporter: Yasuhiro Matsuda
Assignee: Yasuhiro Matsuda
Priority: Trivial
 Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch


 We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-2013:

Attachment: KAFKA-2013_2015-03-16_13:23:38.patch

 benchmark test for the purgatory
 

 Key: KAFKA-2013
 URL: https://issues.apache.org/jira/browse/KAFKA-2013
 Project: Kafka
  Issue Type: Test
  Components: purgatory
Reporter: Yasuhiro Matsuda
Assignee: Yasuhiro Matsuda
Priority: Trivial
 Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch


 We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31742: Patch for KAFKA-527

2015-03-16 Thread Guozhang Wang


 On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/message/MessageWriter.scala, line 29
  https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line29
 
  Add a check that codec should not be NoCompression.
 
 Yasuhiro Matsuda wrote:
 Why the codec should not be NoCompression? The code works with 
 NoCompression, too.

That's right, it works with NoCompression too. My Bad.


 On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/message/MessageWriter.scala, line 97
  https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line97
 
  Could we use comments in 
  
  /**
   *
   */
   
  format?
 
 Yasuhiro Matsuda wrote:
 Is this comment style prohibitted? This class is for internal use with 
 fairly localized usage.

We used to maintain a coding guidence 
(http://kafka.apache.org/coding-guide.html), but I think we did not do a great 
job enforcing it and the page itself is also a bit out dated. Jay added the 
checkstyle package in order to improve on this aspect, but that do not have 
comments rules in it. I thought the common rules in the code are:

1. Use /* */ for class definitions and user-facing API comments.
2. Use // for in-function comments (no-capitalization).

But it is somehow not programmatically enforced. Anyways, let me know if you 
think that is too picky and we can try be more flexible in terms of commenting.


 On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/message/MessageWriter.scala, line 117
  https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line117
 
  We can just pass in the Byte here.
 
 Yasuhiro Matsuda wrote:
 This is a contract of OutputStream.

Cool. Could we add the optional override here?


 On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/message/MessageWriter.scala, line 135
  https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line135
 
  Better group the private functions together after the public functions.
 
 Yasuhiro Matsuda wrote:
 Well, I don't think it is particulary better way to organize code, but if 
 you insist I can change it.
 Kafka code base doesn't seem to follow that convention...

Again, we did not do a good job enforcing any sort of such coding style, and it 
maybe just myself being unreasonable about these rules. I am open to other 
reviewers taking a look and giving his / her thoughts.


On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote:
  The inheritance of MessageWriter from BufferingOutputStream is a bit 
  confusing, since it will always use itself in the writePayload function 
  parameter. 
  
  I feel it is more clear to read the code if we just let MessageWriter 
  contains a var of BufferingOutputStream; and instead of pass in the 
  function logic of writing the message, we can just pass in messages and 
  offsetCounter in the write() call which will then write the messages itself.
 
 Yasuhiro Matsuda wrote:
 It is true that the current code writes only through writePayload. But I 
 wanted MessageWriter to be a subclass of OutputStream to be more generic in 
 case we need to write additional inforation other than messages in future.

As for now MessageWriter's only public function is write(key, codec) 
(valueWritefunction), which is used for writing a single message. Also its 
private functions withCrc32Prefix / withLengthPrefix is only used for message 
writing. So it is a bit unclear about your motivation in future extensions. 
Could you elaborate a bit more on that?


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76454
---


On March 4, 2015, 7:43 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31742/
 ---
 
 (Updated March 4, 2015, 7:43 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-527
 https://issues.apache.org/jira/browse/KAFKA-527
 
 
 Repository: kafka
 
 
 Description
 ---
 
 less byte copies
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
 9c694719dc9b515fb3c3ae96435a87b334044272 
   core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31742/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




Re: java.net.BindException: Address already in use

2015-03-16 Thread Jun Rao
This is being tracked in KAFKA-1501. Typically, this won't happen on a
dedicated machine.

Thanks,

Jun

On Mon, Mar 16, 2015 at 5:02 AM, Tong Li liton...@us.ibm.com wrote:


 Hi guys, when I ran test, I got a lot of these exceptions. I wonder if you
 had similar problems running the tests and how resolved the issue.

 Thanks.

 Tong Li
 OpenStack  Kafka Community Development
 Building 501/B205
 liton...@us.ibm.com


[jira] [Created] (KAFKA-2021) Consolidate test classes for KafkaConfig

2015-03-16 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-2021:
---

 Summary: Consolidate test classes for KafkaConfig
 Key: KAFKA-2021
 URL: https://issues.apache.org/jira/browse/KAFKA-2021
 Project: Kafka
  Issue Type: Task
Reporter: Gwen Shapira
Priority: Minor


We have kafka.server.KafkaConfigTest, KafkaConfigConfigDefTest and 
kafka.unit.KafkaTest (in a file called KafkaConfigTest.scala)

I think consolidating them into one test class (or at list renaming so it will 
be clear how they are different) will make a lot of sense.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


java.net.BindException: Address already in use

2015-03-16 Thread Tong Li

Hi guys, when I ran test, I got a lot of these exceptions. I wonder if you
had similar problems running the tests and how resolved the issue.

Thanks.

Tong Li
OpenStack  Kafka Community Development
Building 501/B205
liton...@us.ibm.com

[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-03-16 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363051#comment-14363051
 ] 

Honghai Chen commented on KAFKA-1646:
-

Hey [~jkreps] Is it ok to add one configuration like log.preallocatefile to 
the configuration and change the three places of if Os.IsWindows to check the 
configuration?

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, 
 KAFKA-1646_20150312_200352.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2015-03-16 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363129#comment-14363129
 ] 

Dmitry Bugaychenko commented on KAFKA-1305:
---

Even with a fast dedicated channel there will be a race condition in switching 
leadership. It could be removed either by complicating the protocol (eg. the 
new leader shoul take leadership only after getting not a leader respone in 
fetcher thread from the old one, while the old leader should stop handling 
produce request allowing fetches only from the new leader untill it gets 
everything), or, may be, it is worth to consider getting rid of controller in 
partition leader election and use distributed elections in ZK.

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2.0, 0.9.0

 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, 
 KAFKA-1305_2014-10-13_07:30:45.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 

[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-03-16 Thread Tong Li (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363164#comment-14363164
 ] 

Tong Li commented on KAFKA-1926:


@Jun Rao, awesome comments. I will be following the directions and provide new 
patch set. This also confirms the direction that I am going. Thanks.

 Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 ---

 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
  Labels: newbie, patch
 Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch


 There is currently a lot of duplication between the Utils class in common and 
 the one in core.
 Our plan has been to deprecate duplicate code in the server and replace it 
 with the new common code.
 As such we should evaluate each method in the scala Utils and do one of the 
 following:
 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
 utility in active use that is not Kafka-specific. If we migrate it we should 
 really think about the API and make sure there is some test coverage. A few 
 things in there are kind of funky and we shouldn't just blindly copy them 
 over.
 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
 any utilities that really need to make use of Scala features to be convenient.
 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363399#comment-14363399
 ] 

Gwen Shapira commented on KAFKA-1809:
-

Updated reviewboard https://reviews.apache.org/r/28769/diff/
 against branch trunk

 Refactor brokers to allow listening on multiple ports and IPs 
 --

 Key: KAFKA-1809
 URL: https://issues.apache.org/jira/browse/KAFKA-1809
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
 KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
 KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
 KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
 KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
 KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
 KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
 KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
 KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
 KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
 KAFKA-1809_2015-03-16_09:02:18.patch


 The goal is to eventually support different security mechanisms on different 
 ports. 
 Currently brokers are defined as host+port pair, and this definition exists 
 throughout the code-base, therefore some refactoring is needed to support 
 multiple ports for a single broker.
 The detailed design is here: 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1809:

Attachment: KAFKA-1809_2015-03-16_09:40:49.patch

 Refactor brokers to allow listening on multiple ports and IPs 
 --

 Key: KAFKA-1809
 URL: https://issues.apache.org/jira/browse/KAFKA-1809
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
 KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
 KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
 KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
 KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
 KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
 KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
 KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
 KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
 KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
 KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch


 The goal is to eventually support different security mechanisms on different 
 ports. 
 Currently brokers are defined as host+port pair, and this definition exists 
 throughout the code-base, therefore some refactoring is needed to support 
 multiple ports for a single broker.
 The detailed design is here: 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
Agree. Since throwing exception when close() is called in callback won’t
work because we are catching all the exceptions from callback, blocking
might be the only option we have here.

Jiangjie (Becket) Qin

On 3/15/15, 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote:

Cool.

I think blocking is good or alternately throwing an exception directly
from
close(). Basically I would just worry about subtly doing something
slightly
different from what the user asked for as it will be hard to notice that
behavior difference.

-Jay

On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Jay,

 I have modified the KIP as you suggested. I thinks as long as we have
 consistent define for timeout across Kafka interface, there would be no
 problem. And I also agree it is better if we can make producer block
when
 close() is called from sender thread so user will notice something went
 wrong.

 Thanks.

 Jiangjie (Becket) Qin

 On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Jiangjie,
 
 I think this is going to be very confusing that
   close(0) waits indefinitely and
   close(-1) waits for 0.
 I understand this appears in other apis, but it is a constant cause of
 bugs. Let's not repeat that mistake.
 
 Let's make close(0) wait for 0. We don't need a way to wait
indefinitely
 as
 we already have close() so having a magical constant for that is
 redundant.
 
 Calling close() from the I/O thread was already possible and would
block
 indefinitely. I think trying to silently change the behavior is
probably
 not right. I.e. if the user calls close() in the callback there is
 actually
 some misunderstanding and they need to think more, silently making this
 not
 block will hide the problem from them which is the opposite of what we
 want.
 
 -Jay
 
 On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:
 
  Hey Joe  Jay,
 
  Thanks for the comments on the voting thread. Since it seems we
probably
  will have more discussion on this, I am just replying from the
 discussion
  thread here.
  I’ve updated the KIP page to make it less like half-baked, apologize
for
  the rush...
 
  The contract in current KIP is:
1. close() - wait until all requests either are sent or reach
request
  timeout.
2. close(-1, TimeUnit.MILLISECONDS) - close immediately
3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e.
Wait
  until all requests are sent or reach request timeout
4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending
 in 5
  milliseconds, if something went wrong, just shutdown the producer
 anyway,
  my callback will handle the failures.
 
  About how we define what timeout value stands for, I actually
struggled
 a
  little bit when wrote the patch. Intuitively, close(0) should mean
  immediately, however it seems that all the existing java class have
this
  convention of timeout=0 means no timeout or never timeout
 (Thread.join(0),
  Object.wait(0), etc.) So here the dilemma is either we follow the
  intuition or we follow the convention. What I chose is to follow the
  convention but document the interface to let user be aware of the
usage.
  The reason is that I think producer.close() is a public interface so
it
  might be better to follow java convention. Whereas selector is not a
  public interface that used by user, so as long as it makes sense to
us,
 it
  is less a problem to be different from java convention. That said
since
  consumer.poll(timeout) is also a public interface, I think it also
makes
  sense to make producer.close() to have the same definition of
  consumer.poll(timeout).
 
  The main argument for keeping a timeout in close would be separating
the
  close timeout from request timeout, which probably makes sense. I
would
  guess typically the request timeout would be long (e.g. 60 seconds)
  because we might want to consider retries with back off time. If we
have
  multiple batches in accumulator, in worst case that could take up to
  several minutes to complete all the requests. But when we close a
  producer, we might not want to wait for that long as it might cause
some
  other problem like deployment tool timeout.
 
  There is also a subtle difference between close(timeout) and
  flush(timeout). The only purpose for flush() is to write data to the
  broker, so it makes perfect sense to wait until request timeout. I
think
  that is why flush(timeout) looks strange. On the other hand, the top
  priority for close() is to close the producer rather than flush()
data,
 so
  close(timeout) gives guarantee on bounded waiting for its main job.
 
  Sorry for the confusion about forceClose flag. It is not a public
  interface. I mentioned it in Proposed Changes section which I thought
 was
  supposed to provide implementation details.
 
  Thanks again for all the comments and suggestions!
 
  Jiangjie (Becket) Qin
 
  On 3/10/15, 8:57 PM, Jiangjie Qin j...@linkedin.com wrote:
 
  The KIP page has been 

[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363429#comment-14363429
 ] 

Gwen Shapira commented on KAFKA-1809:
-

[~junrao]:

1. Addressed all sub-points
2. I think using actual version numbers in the config is more admin-friendly. I 
changed the parseConfig function to just use the 3 significant version numbers.
3. Fixed
4. good catch, that was silly :) fixed.
5. Fixed
6. I had to change this configurationin order to test end-to-end with a 
non-default protocol
7. Fixed
8. Yep. 
9. Merge bug. Fixed this.
10. TRACE is used for testing only, because it was important to make sure that 
things still work when I use the non-default protocol.
For example in SocketServerTest, but also I used it in manual testing.
11. Fixed
12. For performance reasons. Especially when validating the segments at the 
end. In current patch I changed it for all replica_testcases.
13. Ick! update metadatarequest was missing from the ser/de test suite! added 
it and validated that it catches the issue.
14. Ran system-tests (replica testcases only)
14.2 Ran with console producer and consumer. I'll open a separate JIRA for the 
rest of the tools, but I think it can go after we add the security protocol 
implementations.
14.3 Opened separate JIRA for this.



 Refactor brokers to allow listening on multiple ports and IPs 
 --

 Key: KAFKA-1809
 URL: https://issues.apache.org/jira/browse/KAFKA-1809
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
 KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
 KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
 KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
 KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
 KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
 KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
 KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
 KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
 KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
 KAFKA-1809_2015-03-16_09:02:18.patch


 The goal is to eventually support different security mechanisms on different 
 ports. 
 Currently brokers are defined as host+port pair, and this definition exists 
 throughout the code-base, therefore some refactoring is needed to support 
 multiple ports for a single broker.
 The detailed design is here: 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Question about concurrency during Log config change

2015-03-16 Thread Andrii Biletskyi
Jay,

Thanks for quick response. Yes, this might be not that harmful for users,
I'm not sure about that. But it definitely looks like data race. Your
solution
is simple and should work, hard to tell promptly when it's about
concurrency.

Initially I was looking through this code to understand whether we can
inherit
this approach for Global Brokers Config. In this case your solution will be
harder
to implement since we access broker's config in many-many different places.
But that's another story.

Thanks,
Andrii Biletskyi

On Mon, Mar 16, 2015 at 5:56 PM, Jay Kreps jay.kr...@gmail.com wrote:

 You are correct. Each read will be a valid value but there is no guarantee
 that subsequent reads will read from the same config. I don't think that is
 a problem, do you? If we want to strengthen the guarantee we can grab the
 config once in the method
val config = log.config
 and then do however many accesses against that variable which will remain
 constant even if the config is updated in the course of the method.

 -Jay

 On Mon, Mar 16, 2015 at 8:50 AM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

  Hi all,
 
  I was looking through the code related to dynamic Log config change
  feature and
  noticed the way we deal with concurrency there. I have a question about
 it.
 
  The Log class holds volatile LogConfig property, almost all methods in
  Log.scala are
  synchronized on private lock object. But the code in TopicConfigManager
  (
 
 
 https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
  )
  which substitutes Log's logConfig is not synchronized.
 
  Code execution example:
  Thread 1: Log.append - Log:288 config.*maxMessageSize* is accessed
 
 
 https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288
 
  Thread 2: handles log config change - TopicConfigManager:108 (see above)
  substitutes
  log's config - changes *maxMessageSize* and *segmentSize*
 
  Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
  pickups updated
  config setting
 
 
 https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299
 
  So looks like we accessed object in partial state - in scope of one
  procedure
  (Log.append) we took one setting from the old state (maxMessageSize), and
  the other
  one from the updated state.
 
  Methods in Log are synchronized, as mentioned above. But logConfig is
 only
  volatile
  which solves visibility problems but doesn't prevent it from being
 changed
  in other
  thread, as I understand.
 
  Am I missing something here?
 
  Thanks,
  Andrii Biletskyi
 



[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363465#comment-14363465
 ] 

Gwen Shapira commented on KAFKA-1809:
-

Updated reviewboard https://reviews.apache.org/r/28769/diff/
 against branch trunk

 Refactor brokers to allow listening on multiple ports and IPs 
 --

 Key: KAFKA-1809
 URL: https://issues.apache.org/jira/browse/KAFKA-1809
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
 KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
 KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
 KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
 KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
 KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
 KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
 KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
 KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
 KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
 KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch


 The goal is to eventually support different security mechanisms on different 
 ports. 
 Currently brokers are defined as host+port pair, and this definition exists 
 throughout the code-base, therefore some refactoring is needed to support 
 multiple ports for a single broker.
 The detailed design is here: 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-03-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1928:
-
Issue Type: Sub-task  (was: Improvement)
Parent: KAFKA-1682

 Move kafka.network over to using the network classes in 
 org.apache.kafka.common.network
 ---

 Key: KAFKA-1928
 URL: https://issues.apache.org/jira/browse/KAFKA-1928
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Gwen Shapira

 As part of the common package we introduced a bunch of network related code 
 and abstractions.
 We should look into replacing a lot of what is in kafka.network with this 
 code. Duplicate classes include things like Receive, Send, etc. It is likely 
 possible to also refactor the SocketServer to make use of Selector which 
 should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-03-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1928:
-
Component/s: security

 Move kafka.network over to using the network classes in 
 org.apache.kafka.common.network
 ---

 Key: KAFKA-1928
 URL: https://issues.apache.org/jira/browse/KAFKA-1928
 Project: Kafka
  Issue Type: Improvement
  Components: security
Reporter: Jay Kreps
Assignee: Gwen Shapira

 As part of the common package we introduced a bunch of network related code 
 and abstractions.
 We should look into replacing a lot of what is in kafka.network with this 
 code. Duplicate classes include things like Receive, Send, etc. It is likely 
 possible to also refactor the SocketServer to make use of Selector which 
 should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28769: Patch for KAFKA-1809

2015-03-16 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28769/
---

(Updated March 16, 2015, 4:41 p.m.)


Review request for kafka.


Bugs: KAFKA-1809
https://issues.apache.org/jira/browse/KAFKA-1809


Repository: kafka


Description (updated)
---

forgot rest of patch


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
fa9daaef66ff7961e1c46cd0cd8fed18a53bccd8 
  clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
920b51a6c3c99639fbc9dc0656373c19fabd 
  clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
c899813d55b9c4786adde3d840f040d6645d27c8 
  config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5 
  core/src/main/scala/kafka/admin/AdminUtils.scala 
b700110f2d7f1ede235af55d8e37e1b5592c6c7d 
  core/src/main/scala/kafka/admin/TopicCommand.scala 
f400b71f8444fffd3fc1d8398a283682390eba4e 
  core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
24aaf954dc42e2084454fa5fc9e8f388ea95c756 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
4ff7e8f8cc695551dd5d2fe65c74f6b6c571e340 
  core/src/main/scala/kafka/api/TopicMetadata.scala 
0190076df0adf906ecd332284f222ff974b315fc 
  core/src/main/scala/kafka/api/TopicMetadataResponse.scala 
92ac4e687be22e4800199c0666bfac5e0059e5bb 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
530982e36b17934b8cc5fb668075a5342e142c59 
  core/src/main/scala/kafka/client/ClientUtils.scala 
ebba87f0566684c796c26cb76c64b4640a5ccfde 
  core/src/main/scala/kafka/cluster/Broker.scala 
0060add008bb3bc4b0092f2173c469fce0120be6 
  core/src/main/scala/kafka/cluster/BrokerEndPoint.scala PRE-CREATION 
  core/src/main/scala/kafka/cluster/EndPoint.scala PRE-CREATION 
  core/src/main/scala/kafka/cluster/SecurityProtocol.scala PRE-CREATION 
  core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
9ebbee6c16dc83767297c729d2d74ebbd063a993 
  core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b9e2bea7b442a19bcebd1b350d39541a8c9dd068 
  core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
152fda5d1dcdf319399fdeeb8457006090ebe56c 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
cca815a128419e146feff53adaeddc901bb5de1f 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
c582191636f6188c25d62a67ff0315b56f163133 
  core/src/main/scala/kafka/controller/KafkaController.scala 
09fc46d759b74bcdad2d2a610d9c5a93ff02423f 
  core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala 
d281bb31a66fd749ecddfbe38479b6903f436831 
  core/src/main/scala/kafka/javaapi/TopicMetadata.scala 
f384e04678df10a5b46a439f475c63371bf8e32b 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/network/SocketServer.scala 
76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
  core/src/main/scala/kafka/producer/ProducerPool.scala 
43df70bb461dd3e385e6b20396adef3c4016a3fc 
  core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
20c00cb8cc2351950edbc8cb1752905a0c26e79f 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
e731df4b2a3e44aa3d761713a09b1070aff81430 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
7907987e43404487382de7f4cc294f0d01ac15a7 
  core/src/main/scala/kafka/server/KafkaServer.scala 
dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/main/scala/kafka/server/MetadataCache.scala 
6aef6e4508ecadbbcc1e12bed2054547b7aa333e 
  core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
351dbbad3bdb709937943b336a5b0a9e0162a5e2 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
d1e7c434e77859d746b8dc68dd5d5a3740425e79 
  core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
ba6ddd7a909df79a0f7d45e8b4a2af94ea0fceb6 
  core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
b4f903b6c7c3bb725cac7c05eb1f885906413c4d 
  core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 
111c9a8b94ce45d95551482e9fd3f8c1cccbf548 
  core/src/main/scala/kafka/utils/Utils.scala 
738c1af9ef5de16fdf5130daab69757a14c48b5c 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
7ae999ec619443d35a9cb8fbcd531fca0c51c8c0 
  

Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
It seems there are two options we can choose from when close() is called
from sender thread (callback):
1. Log an error and close the producer using close(-1)
2. Log an error and block.
(Throwing an exception will not work because we catch all the exception
thrown from user callback. It will just lead to an error log.)

My concern for the first option is that the producer will be closed even
if we logged and error. I am wondering if some user would not even take a
look at the log if producer is closed normally. Because from the programs
behavior, everything looks good. If that is the case, the error message we
logged probably will just be ignored until some day when people check the
log and see it.

As for the second option, because producer does not close but blocks. User
will notice this the first time they run the program. They probably will
look at the log to see why producer could not be closed and they will see
the error log we put there. So they will get informed about this mis-usage
of close() in sender thread the first time they run the code instead of
some time later.

Personally I prefer the second one because it is more obvious that
something was wrong.

Jiangjie (Becket) Qin

On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote:

Yeah I agree we should not silently change the behavior of the function
with the given parameters; and I would prefer error-logging-and-shutdown
over blocking when close(0) is used, since as Neha suggested blocking
would also not proceed with sending any data, bu will just let users to
realize the issue later than sooner.

On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io wrote:

 
  And I also agree it is better if we can make producer block when
  close() is called from sender thread so user will notice something
went
  wrong.


 This isn't a great experience either. Why can't we just throw an
exception
 for a behavior we know is incorrect and we'd like the user to know.
 Blocking as a means of doing that seems wrong and annoying.

 On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Cool.
 
  I think blocking is good or alternately throwing an exception directly
 from
  close(). Basically I would just worry about subtly doing something
 slightly
  different from what the user asked for as it will be hard to notice
that
  behavior difference.
 
  -Jay
 
  On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
  wrote:
 
   Hi Jay,
  
   I have modified the KIP as you suggested. I thinks as long as we
have
   consistent define for timeout across Kafka interface, there would
be no
   problem. And I also agree it is better if we can make producer block
 when
   close() is called from sender thread so user will notice something
went
   wrong.
  
   Thanks.
  
   Jiangjie (Becket) Qin
  
   On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote:
  
   Hey Jiangjie,
   
   I think this is going to be very confusing that
 close(0) waits indefinitely and
 close(-1) waits for 0.
   I understand this appears in other apis, but it is a constant
cause of
   bugs. Let's not repeat that mistake.
   
   Let's make close(0) wait for 0. We don't need a way to wait
 indefinitely
   as
   we already have close() so having a magical constant for that is
   redundant.
   
   Calling close() from the I/O thread was already possible and would
 block
   indefinitely. I think trying to silently change the behavior is
 probably
   not right. I.e. if the user calls close() in the callback there is
   actually
   some misunderstanding and they need to think more, silently making
 this
   not
   block will hide the problem from them which is the opposite of
what we
   want.
   
   -Jay
   
   On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
   
Hey Joe  Jay,
   
Thanks for the comments on the voting thread. Since it seems we
  probably
will have more discussion on this, I am just replying from the
   discussion
thread here.
I’ve updated the KIP page to make it less like half-baked,
apologize
  for
the rush...
   
The contract in current KIP is:
  1. close() - wait until all requests either are sent or reach
  request
timeout.
  2. close(-1, TimeUnit.MILLISECONDS) - close immediately
  3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(),
i.e.
  Wait
until all requests are sent or reach request timeout
  4. close(5, TimeUnit.MILLISECONDS) - try the best to finish
 sending
   in 5
milliseconds, if something went wrong, just shutdown the producer
   anyway,
my callback will handle the failures.
   
About how we define what timeout value stands for, I actually
  struggled
   a
little bit when wrote the patch. Intuitively, close(0) should
mean
immediately, however it seems that all the existing java class
have
  this
convention of timeout=0 means no timeout or never timeout
   (Thread.join(0),

[jira] [Updated] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1809:

Attachment: KAFKA-1809_2015-03-16_09:02:18.patch

 Refactor brokers to allow listening on multiple ports and IPs 
 --

 Key: KAFKA-1809
 URL: https://issues.apache.org/jira/browse/KAFKA-1809
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
 KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
 KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
 KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
 KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
 KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
 KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
 KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
 KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
 KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
 KAFKA-1809_2015-03-16_09:02:18.patch


 The goal is to eventually support different security mechanisms on different 
 ports. 
 Currently brokers are defined as host+port pair, and this definition exists 
 throughout the code-base, therefore some refactoring is needed to support 
 multiple ports for a single broker.
 The detailed design is here: 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28769: Patch for KAFKA-1809

2015-03-16 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28769/
---

(Updated March 16, 2015, 4:02 p.m.)


Review request for kafka.


Bugs: KAFKA-1809
https://issues.apache.org/jira/browse/KAFKA-1809


Repository: kafka


Description (updated)
---

squashing multi-broker-endpoint patches


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
PRE-CREATION 
  core/src/main/scala/kafka/cluster/BrokerEndPoint.scala PRE-CREATION 
  core/src/main/scala/kafka/cluster/EndPoint.scala PRE-CREATION 
  core/src/main/scala/kafka/cluster/SecurityProtocol.scala PRE-CREATION 
  core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/cluster/BrokerTest.scala PRE-CREATION 
  system_test/run_all.sh PRE-CREATION 
  system_test/run_all_replica.sh PRE-CREATION 

Diff: https://reviews.apache.org/r/28769/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Commented] (KAFKA-1994) Evaluate performance effect of chroot check on Topic creation

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363349#comment-14363349
 ] 

Gwen Shapira commented on KAFKA-1994:
-

New patch looks good. 
[~singhashish], can you share how does the performance of the code with new 
patch compares to that of the older solution and to that of createPersistent() 
without any of the checks?

 Evaluate performance effect of chroot check on Topic creation
 -

 Key: KAFKA-1994
 URL: https://issues.apache.org/jira/browse/KAFKA-1994
 Project: Kafka
  Issue Type: Improvement
Reporter: Ashish K Singh
Assignee: Ashish K Singh
 Attachments: KAFKA-1994.patch, KAFKA-1994_2015-03-03_18:19:45.patch


 KAFKA-1664 adds check for chroot while creating a node in ZK. ZkPath checks 
 if namespace exists before trying to create a path in ZK. This raises a 
 concern that checking namespace for each path creation might be unnecessary 
 and can potentially make creations expensive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Question about concurrency during Log config change

2015-03-16 Thread Andrii Biletskyi
Hi all,

I was looking through the code related to dynamic Log config change
feature and
noticed the way we deal with concurrency there. I have a question about it.

The Log class holds volatile LogConfig property, almost all methods in
Log.scala are
synchronized on private lock object. But the code in TopicConfigManager
(
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
)
which substitutes Log's logConfig is not synchronized.

Code execution example:
Thread 1: Log.append - Log:288 config.*maxMessageSize* is accessed
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288

Thread 2: handles log config change - TopicConfigManager:108 (see above)
substitutes
log's config - changes *maxMessageSize* and *segmentSize*

Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
pickups updated
config setting
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299

So looks like we accessed object in partial state - in scope of one
procedure
(Log.append) we took one setting from the old state (maxMessageSize), and
the other
one from the updated state.

Methods in Log are synchronized, as mentioned above. But logConfig is only
volatile
which solves visibility problems but doesn't prevent it from being changed
in other
thread, as I understand.

Am I missing something here?

Thanks,
Andrii Biletskyi


[jira] [Assigned] (KAFKA-2021) Consolidate test classes for KafkaConfig

2015-03-16 Thread Andrii Biletskyi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrii Biletskyi reassigned KAFKA-2021:
---

Assignee: Andrii Biletskyi

 Consolidate test classes for KafkaConfig
 

 Key: KAFKA-2021
 URL: https://issues.apache.org/jira/browse/KAFKA-2021
 Project: Kafka
  Issue Type: Task
Reporter: Gwen Shapira
Assignee: Andrii Biletskyi
Priority: Minor

 We have kafka.server.KafkaConfigTest, KafkaConfigConfigDefTest and 
 kafka.unit.KafkaTest (in a file called KafkaConfigTest.scala)
 I think consolidating them into one test class (or at list renaming so it 
 will be clear how they are different) will make a lot of sense.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Question about concurrency during Log config change

2015-03-16 Thread Jay Kreps
You are correct. Each read will be a valid value but there is no guarantee
that subsequent reads will read from the same config. I don't think that is
a problem, do you? If we want to strengthen the guarantee we can grab the
config once in the method
   val config = log.config
and then do however many accesses against that variable which will remain
constant even if the config is updated in the course of the method.

-Jay

On Mon, Mar 16, 2015 at 8:50 AM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Hi all,

 I was looking through the code related to dynamic Log config change
 feature and
 noticed the way we deal with concurrency there. I have a question about it.

 The Log class holds volatile LogConfig property, almost all methods in
 Log.scala are
 synchronized on private lock object. But the code in TopicConfigManager
 (

 https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
 )
 which substitutes Log's logConfig is not synchronized.

 Code execution example:
 Thread 1: Log.append - Log:288 config.*maxMessageSize* is accessed

 https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288

 Thread 2: handles log config change - TopicConfigManager:108 (see above)
 substitutes
 log's config - changes *maxMessageSize* and *segmentSize*

 Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
 pickups updated
 config setting

 https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299

 So looks like we accessed object in partial state - in scope of one
 procedure
 (Log.append) we took one setting from the old state (maxMessageSize), and
 the other
 one from the updated state.

 Methods in Log are synchronized, as mentioned above. But logConfig is only
 volatile
 which solves visibility problems but doesn't prevent it from being changed
 in other
 thread, as I understand.

 Am I missing something here?

 Thanks,
 Andrii Biletskyi



Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Guozhang Wang
HI Jiangjie,

As far as I understand calling close() in the ioThread is not common, as it
may only trigger when we saw some non-retriable error. Hence when user run
their program it is unlikely that close() will be triggered and problem
will be detected. So it seems to me that from the error detection aspect
these two options seems to be the same as people will usually detect it
from the producer metrics all dropping to 0.

Guozhang

On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 It seems there are two options we can choose from when close() is called
 from sender thread (callback):
 1. Log an error and close the producer using close(-1)
 2. Log an error and block.
 (Throwing an exception will not work because we catch all the exception
 thrown from user callback. It will just lead to an error log.)

 My concern for the first option is that the producer will be closed even
 if we logged and error. I am wondering if some user would not even take a
 look at the log if producer is closed normally. Because from the programs
 behavior, everything looks good. If that is the case, the error message we
 logged probably will just be ignored until some day when people check the
 log and see it.

 As for the second option, because producer does not close but blocks. User
 will notice this the first time they run the program. They probably will
 look at the log to see why producer could not be closed and they will see
 the error log we put there. So they will get informed about this mis-usage
 of close() in sender thread the first time they run the code instead of
 some time later.

 Personally I prefer the second one because it is more obvious that
 something was wrong.

 Jiangjie (Becket) Qin

 On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote:

 Yeah I agree we should not silently change the behavior of the function
 with the given parameters; and I would prefer error-logging-and-shutdown
 over blocking when close(0) is used, since as Neha suggested blocking
 would also not proceed with sending any data, bu will just let users to
 realize the issue later than sooner.
 
 On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io wrote:
 
  
   And I also agree it is better if we can make producer block when
   close() is called from sender thread so user will notice something
 went
   wrong.
 
 
  This isn't a great experience either. Why can't we just throw an
 exception
  for a behavior we know is incorrect and we'd like the user to know.
  Blocking as a means of doing that seems wrong and annoying.
 
  On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
 
   Cool.
  
   I think blocking is good or alternately throwing an exception directly
  from
   close(). Basically I would just worry about subtly doing something
  slightly
   different from what the user asked for as it will be hard to notice
 that
   behavior difference.
  
   -Jay
  
   On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
  
Hi Jay,
   
I have modified the KIP as you suggested. I thinks as long as we
 have
consistent define for timeout across Kafka interface, there would
 be no
problem. And I also agree it is better if we can make producer block
  when
close() is called from sender thread so user will notice something
 went
wrong.
   
Thanks.
   
Jiangjie (Becket) Qin
   
On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote:
   
Hey Jiangjie,

I think this is going to be very confusing that
  close(0) waits indefinitely and
  close(-1) waits for 0.
I understand this appears in other apis, but it is a constant
 cause of
bugs. Let's not repeat that mistake.

Let's make close(0) wait for 0. We don't need a way to wait
  indefinitely
as
we already have close() so having a magical constant for that is
redundant.

Calling close() from the I/O thread was already possible and would
  block
indefinitely. I think trying to silently change the behavior is
  probably
not right. I.e. if the user calls close() in the callback there is
actually
some misunderstanding and they need to think more, silently making
  this
not
block will hide the problem from them which is the opposite of
 what we
want.

-Jay

On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin
  j...@linkedin.com.invalid
   
wrote:

 Hey Joe  Jay,

 Thanks for the comments on the voting thread. Since it seems we
   probably
 will have more discussion on this, I am just replying from the
discussion
 thread here.
 I’ve updated the KIP page to make it less like half-baked,
 apologize
   for
 the rush...

 The contract in current KIP is:
   1. close() - wait until all requests either are sent or reach
   request
 timeout.
   2. close(-1, TimeUnit.MILLISECONDS) - close immediately
   3. close(0, 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-16 Thread Andrii Biletskyi
Jun,

Answering your questions:

101. If I understand you correctly, you are saying future producer versions
(which
will be ported to TMR_V1) won't be able to automatically create topic (if
we
unconditionally remove topic creation from there). But we need to this
preserve logic.
Ok, about your proposal: I'm not a big fan too, when it comes to
differentiating
clients directly in protocol schema. And also I'm not sure I understand at
all why
auto.create.topics.enable is a server side configuration. Can we deprecate
this setting
in future versions, add this setting to producer and based on that upon
receiving
UnknownTopic create topic explicitly by a separate producer call via
adminClient?

102.1. Hm, yes. It's because we want to support batching and at the same
time we
want to give descriptive error messages for clients. Since AdminClient
holds the context
to construct such messages (e.g. AdminClient layer can know that
InvalidArgumentsCode
means two cases: either invalid number - e.g. -1; or replication-factor was
provided while
partitions argument wasn't) - I wrapped responses in Exceptions. But I'm
open to any
other ideas, this was just initial version.
102.2. Yes, I agree. I'll change that to probably some other dto.

Thanks,
Andrii Biletskyi

On Fri, Mar 13, 2015 at 7:16 PM, Jun Rao j...@confluent.io wrote:

 Andrii,

 101. That's what I was thinking too, but it may not be that simple. In
 TopicMetadataRequest_V1,
 we can let it not trigger auto topic creation. Then, in the producer side,
 if it gets an UnknownTopicException, it can explicitly issue a
 createTopicRequest for auto topic creation. On the consumer side, it will
 never issue createTopicRequest. This works when auto topic creation is
 enabled on the broker side. However, I am not sure how things will work
 when auto topic creation is disabled on the broker side. In this case, we
 want to have a way to manually create a topic, potentially through admin
 commands. However, then we need a way to distinguish createTopicRequest
 issued from the producer clients and the admin tools. May be we can add a
 new field in createTopicRequest and set it differently in the producer
 client and the admin client. However, I am not sure if that's the best
 approach.

 2. Yes, refactoring existing requests is a non-trivial amount of work. I
 posted some comments in KAFKA-1927. We will probably have to fix KAFKA-1927
 first, before adding the new logic in KAFKA-1694. Otherwise, the changes
 will be too big.

 102. About the AdminClient:
 102.1. It's a bit weird that we return exception in the api. It seems that
 we should either return error code or throw an exception when getting the
 response state.
 102.2. We probably shouldn't explicitly use the request object in the api.
 Not every request evolution requires an api change.

 Thanks,

 Jun


 On Fri, Mar 13, 2015 at 4:08 AM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

  Jun,
 
  Thanks for you comments. Answers inline:
 
  100. There are a few fields such as ReplicaAssignment,
   ReassignPartitionRequest,
   and PartitionsSerialized that are represented as a string, but contain
   composite structures in json. Could we flatten them out directly in the
   protocol definition as arrays/records?
 
 
  Yes, now with Admin Client this looks a bit weird. My initial motivation
  was:
  ReassignPartitionCommand accepts input in json, we want to remain tools'
  interfaces unchanged, where possible.
  If we port it to deserialized format, in CLI (/tools project) we will
 have
  to add some
  json library since /tools is written in java and we'll need to
 deserialize
  json file
  provided by a user. Can we quickly agree on what this library should be
  (Jackson, GSON, whatever)?
 
  101. Does TopicMetadataRequest v1 still trigger auto topic creation? This
   will be a bit weird now that we have a separate topic creation api.
 Have
   you thought about how the new createTopicRequest and
 TopicMetadataRequest
   v1 will be used in the producer/consumer client, in addition to admin
   tools? For example, ideally, we don't want TopicMetadataRequest from
 the
   consumer to trigger auto topic creation.
 
 
  I agree, this strange logic should be fixed. I'm not confident in this
  Kafka part so
  correct me if I'm wrong, but it doesn't look like a hard thing to do, I
  think we can
  leverage AdminClient for that in Producer and unconditionally remove
 topic
  creation from the TopicMetadataRequest_V1.
 
  2. I think Jay meant getting rid of scala classes
   like HeartbeatRequestAndHeader and HeartbeatResponseAndHeader. We did
  that
   as a stop-gap thing when adding the new requests for the consumers.
   However, the long term plan is to get rid of all those and just reuse
 the
   java request/response in the client. Since this KIP proposes to add a
   significant number of new requests, perhaps we should bite the bullet
 to
   clean up the existing scala requests first before adding new ones?
  
 
  Yes, 

[jira] [Commented] (KAFKA-2019) RoundRobinAssignor clusters by consumer

2015-03-16 Thread Joseph Holsten (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363625#comment-14363625
 ] 

Joseph Holsten commented on KAFKA-2019:
---

[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?

 RoundRobinAssignor clusters by consumer
 ---

 Key: KAFKA-2019
 URL: https://issues.apache.org/jira/browse/KAFKA-2019
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Joseph Holsten
Assignee: Neha Narkhede
Priority: Minor
 Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
 KAFKA-2019.patch


 When rolling out a change today, I noticed that some of my consumers are 
 greedy, taking far more partitions than others.
 The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
 sorted by toString, which is {{ %s-%d.format(consumer, threadId)}}. This 
 causes each consumer's threads to be adjacent to each other.
 One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
 that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2019) RoundRobinAssignor clusters by consumer

2015-03-16 Thread Joseph Holsten (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363625#comment-14363625
 ] 

Joseph Holsten edited comment on KAFKA-2019 at 3/16/15 6:10 PM:


[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*m) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn}} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*m) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*4) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?


was (Author: josephholsten):
[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn}} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?

 RoundRobinAssignor clusters by consumer
 ---

 Key: KAFKA-2019
 URL: https://issues.apache.org/jira/browse/KAFKA-2019
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Joseph Holsten
Assignee: Neha Narkhede
Priority: Minor
 Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
 KAFKA-2019.patch


 When rolling out a change today, I noticed that some of my consumers are 
 greedy, taking far more partitions than others.
 The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
 sorted by toString, which is {{ %s-%d.format(consumer, threadId)}}. This 
 causes each consumer's threads to be adjacent to each other.
 One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
 that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2019) RoundRobinAssignor clusters by consumer

2015-03-16 Thread Joseph Holsten (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363625#comment-14363625
 ] 

Joseph Holsten edited comment on KAFKA-2019 at 3/16/15 6:09 PM:


[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn}} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?


was (Author: josephholsten):
[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?

 RoundRobinAssignor clusters by consumer
 ---

 Key: KAFKA-2019
 URL: https://issues.apache.org/jira/browse/KAFKA-2019
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Joseph Holsten
Assignee: Neha Narkhede
Priority: Minor
 Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
 KAFKA-2019.patch


 When rolling out a change today, I noticed that some of my consumers are 
 greedy, taking far more partitions than others.
 The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
 sorted by toString, which is {{ %s-%d.format(consumer, threadId)}}. This 
 causes each consumer's threads to be adjacent to each other.
 One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
 that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/#review76566
---


Thanks for the patch. A few comments.


core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124137

Do we need to make end volatile since it's being updated in separate thread?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124141

Would it be better to rename this to sth like latencyToCompelete?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124142

Variable due doesn't seem to be used?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124147

I guess the sleep will be added when the actual rate exceeds the target 
rate? Would it be better to rename qtime as requestArrivalTime and interval as 
requestArrivalInterval?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124139

It would be useful to make the # of keys configurable.



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124138

So far, we haven't used this syntax for println. For consistency, perhaps 
it's better to use the existing way of string formatting.



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124143

Could we add some comments on the meaning of mu and sigma?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124144

Could we add some comments for the class? In particular, what does lamda 
mean?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124145

It would be helpful to provide a high level description of what kind of 
distribution we get in the samples. Also, is there a particular reason that we 
pick LogNormal distribution instead of just normal distribution?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124150

Could we add a bit of comment on how the sampling works? I guess it tries 
to spread the # requests into a 1000ms interval and returns the gap for the 
next request on every next() call?

Also, is there a particular reason that we want to choose exponential 
distribution to spread those requests instead of a simple uniform distribution 
(as done in ProducerPerformance)?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
https://reviews.apache.org/r/31893/#comment124148

Is there a particular reason that we need to overwrite isCompleted()? 
Typically, only tryComplete() and onComplete() need to be overwritten in a 
subclass of DelayedOperation.

Actually, I am not sure how we complete the requests before the timeout is 
reached since there is no explict call for tryComplete()?


- Jun Rao


On March 10, 2015, 4:41 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31893/
 ---
 
 (Updated March 10, 2015, 4:41 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2013
 https://issues.apache.org/jira/browse/KAFKA-2013
 
 
 Repository: kafka
 
 
 Description
 ---
 
 purgatory micro benchmark
 
 
 Diffs
 -
 
   core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31893/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76585
---



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/31967/#comment124158

Interesting point. I thought that it would be enough to simply check the 
lag value. But yes, this will cause the HW to be inconsistent.


- Aditya Auradkar


On March 12, 2015, 8:42 p.m., Aditya Auradkar wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31967/
 ---
 
 (Updated March 12, 2015, 8:42 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1546
 https://issues.apache.org/jira/browse/KAFKA-1546
 
 
 Repository: kafka
 
 
 Description
 ---
 
 PATCH for KAFKA-1546
 
 Brief summary of changes:
 - Added a lagBegin metric inside Replica to track the lag in terms of time 
 since the replica did not read from the LEO
 - Using lag begin value in the check for ISR expand and shrink
 - Removed the max lag messages config since it is no longer necessary
 - Returning the initialLogEndOffset in LogReadResult corresponding to the the 
 LEO before actually reading from the log.
 - Unit test cases to test ISR shrinkage and expansion
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/cluster/Partition.scala 
 c4bf48a801007ebe7497077d2018d6dffe1677d4 
   core/src/main/scala/kafka/cluster/Replica.scala 
 bd13c20338ce3d73113224440e858a12814e5adb 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 48e33626695ad8a28b0018362ac225f11df94973 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
 
 Diff: https://reviews.apache.org/r/31967/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aditya Auradkar
 




Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Ewen Cheslack-Postava
Agreed that trying to shoehorn non-error codes into the error field is a
bad idea. It makes it *way* too easy to write code that looks (and should
be) correct but is actually incorrect. If necessary, I think it's much
better to to spend a couple of extra bytes to encode that information
separately (a status or warning section of the response). An indication
that throttling is occurring is something I'd expect to be indicated by a
bit flag in the response rather than as an error code.

Gwen - I think an error code makes sense when the request actually failed.
Option B, which Jun was advocating, would have appended the messages
successfully. If the rate-limiting case you're talking about had
successfully committed the messages, I would say that's also a bad use of
error codes.


On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com
wrote:

 We discussed an error code for rate-limiting (which I think made
 sense), isn't it a similar case?

 On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote:
  My concern is that as soon as you start encoding non-error response
  information into error codes the next question is what to do if two such
  codes apply (i.e. you have a replica down and the response is quota'd). I
  think I am trying to argue that error should mean why we failed your
  request, for which there will really only be one reason, and any other
  useful information we want to send back is just another field in the
  response.
 
  -Jay
 
  On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  I think its not too late to reserve a set of error codes (200-299?)
  for non-error codes.
 
  It won't be backward compatible (i.e. clients that currently do else
  throw will throw on non-errors), but perhaps its worthwhile.
 
  On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote:
   Hey Jun,
  
   I'd really really really like to avoid that. Having just spent a
 bunch of
   time on the clients, using the error codes to encode other information
   about the response is super dangerous. The error handling is one of
 the
   hardest parts of the client (Guozhang chime in here).
  
   Generally the error handling looks like
 if(error == none)
// good, process the request
 else if(error == KNOWN_ERROR_1)
// handle known error 1
 else if(error == KNOWN_ERROR_2)
// handle known error 2
 else
throw Errors.forCode(error).exception(); // or some other default
   behavior
  
   This works because we have a convention that and error is something
 that
   prevented your getting the response so the default handling case is
 sane
   and forward compatible. It is tempting to use the error code to convey
   information in the success case. For example we could use error codes
 to
   encode whether quotas were enforced, whether the request was served
 out
  of
   cache, whether the stock market is up today, or whatever. The problem
 is
   that since these are not errors as far as the client is concerned it
  should
   not throw an exception but process the response, but now we created an
   explicit requirement that that error be handled explicitly since it is
   different. I really think that this kind of information is not an
 error,
  it
   is just information, and if we want it in the response we should do
 the
   right thing and add a new field to the response.
  
   I think you saw the Samza bug that was literally an example of this
   happening and leading to an infinite retry loop.
  
   Further more I really want to emphasize that hitting your quota in the
   design that Adi has proposed is actually not an error condition at
 all.
  It
   is totally reasonable in any bootstrap situation to intentionally
 want to
   run at the limit the system imposes on you.
  
   -Jay
  
  
  
   On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote:
  
   It's probably useful for a client to know whether its requests are
   throttled or not (e.g., for monitoring and alerting). From that
   perspective, option B (delay the requests and return an error) seems
   better.
  
   Thanks,
  
   Jun
  
   On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar 
   aaurad...@linkedin.com.invalid wrote:
  
Posted a KIP for quotas in kafka.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
   
Appreciate any feedback.
   
Aditya
   
  
 




-- 
Thanks,
Ewen


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jun Rao
Hmm, does that mean that after close(0), the sender thread is not necessary
gone? Normally, after closing an entity, we expect all internal threads
associated with the entity are shut down completely.

Thanks,

Jun

On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Jun,

 Close(0) will set two flags in sender. Running=false and a newly added
 forceClose=true. It will also set accumulator.closed=true so no further
 producer.send() will succeed.
 The sender thread will finish executing all the callbacks in current batch
 of responses, then it will see the forceClose flag. It will just fail all
 the incomplete batches in the producer and exit.
 So close(0) is a non-blocking call and sender thread will not try to join
 itself in close(0).

 Thanks.

 Jiangjie (Becket) Qin

 On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote:

 How does close(0) work if it's called from the sender thread? If close(0)
 needs to wait for the sender thread to join, wouldn't this cause a
 deadlock?
 
 Thanks,
 
 Jun
 
 On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Thanks Guozhang. It wouldn’t be as thoroughly considered without
  discussing with you :)
 
  Jiangjie (Becket) Qin
 
  On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote:
 
  Thanks Jiangjie,
  
  After talking to you offline on this, I have been convinced and
 changed my
  preference to blocking. The immediate shutdown approach does have some
  unsafeness in some cases.
  
  Guozhang
  
  On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin
 j...@linkedin.com.invalid
  
  wrote:
  
   It looks that the problem we want to solve and the purpose we want to
   achieve is:
   If user uses close() in callback, we want to let user be aware that
 they
   should use close(0) instead of close() in the callback.
  
   We have agreed that we will have an error log to inform user about
 this
   mis-usage. The options differ in the way how we can force user to
 take a
   look at that error log.
   There are two scenarios:
   1. User does not expect the program to exit.
   2. User expect the program to exit.
  
   For scenario 1), blocking will probably delay the discovery of the
   problem. Calling close(0) exposes the problem quicker. In this
 scenario
   producer just encounter a send failure when running normally.
   For scenario 2), blocking will expose the problem quick. Calling
  close(-1)
   might hide the problem. This scenario might include: a) Unit test
 for a
   send failure. b) Message sending during a close() call from a user
  thread.
  
   So as a summary table:
  
 Scenario 1) Scenario 2)
  
   Blocking  Delay problem discovery Guaranteed problem
  discovery
  
   Close(-1) Immediate problem discovery Problem might be hidden
  
  
   Personally I prefer blocking because it seems providing more
 guarantees
   and safer.
  
   Thanks.
  
   Jiangjie (Becket) Qin
  
  
   On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote:
  
   HI Jiangjie,
   
   As far as I understand calling close() in the ioThread is not
 common,
  as
   it
   may only trigger when we saw some non-retriable error. Hence when
 user
  run
   their program it is unlikely that close() will be triggered and
 problem
   will be detected. So it seems to me that from the error detection
  aspect
   these two options seems to be the same as people will usually
 detect it
   from the producer metrics all dropping to 0.
   
   Guozhang
   
   On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
  j...@linkedin.com.invalid
   wrote:
   
It seems there are two options we can choose from when close() is
  called
from sender thread (callback):
1. Log an error and close the producer using close(-1)
2. Log an error and block.
(Throwing an exception will not work because we catch all the
  exception
thrown from user callback. It will just lead to an error log.)
   
My concern for the first option is that the producer will be
 closed
  even
if we logged and error. I am wondering if some user would not even
  take
   a
look at the log if producer is closed normally. Because from the
   programs
behavior, everything looks good. If that is the case, the error
  message
   we
logged probably will just be ignored until some day when people
 check
   the
log and see it.
   
As for the second option, because producer does not close but
 blocks.
   User
will notice this the first time they run the program. They
 probably
  will
look at the log to see why producer could not be closed and they
 will
   see
the error log we put there. So they will get informed about this
   mis-usage
of close() in sender thread the first time they run the code
 instead
  of
some time later.
   
Personally I prefer the second one because it is more obvious that
something was wrong.
   
Jiangjie (Becket) Qin
   
On 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Jay Kreps
Hey Jun,

I'd really really really like to avoid that. Having just spent a bunch of
time on the clients, using the error codes to encode other information
about the response is super dangerous. The error handling is one of the
hardest parts of the client (Guozhang chime in here).

Generally the error handling looks like
  if(error == none)
 // good, process the request
  else if(error == KNOWN_ERROR_1)
 // handle known error 1
  else if(error == KNOWN_ERROR_2)
 // handle known error 2
  else
 throw Errors.forCode(error).exception(); // or some other default
behavior

This works because we have a convention that and error is something that
prevented your getting the response so the default handling case is sane
and forward compatible. It is tempting to use the error code to convey
information in the success case. For example we could use error codes to
encode whether quotas were enforced, whether the request was served out of
cache, whether the stock market is up today, or whatever. The problem is
that since these are not errors as far as the client is concerned it should
not throw an exception but process the response, but now we created an
explicit requirement that that error be handled explicitly since it is
different. I really think that this kind of information is not an error, it
is just information, and if we want it in the response we should do the
right thing and add a new field to the response.

I think you saw the Samza bug that was literally an example of this
happening and leading to an infinite retry loop.

Further more I really want to emphasize that hitting your quota in the
design that Adi has proposed is actually not an error condition at all. It
is totally reasonable in any bootstrap situation to intentionally want to
run at the limit the system imposes on you.

-Jay



On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote:

 It's probably useful for a client to know whether its requests are
 throttled or not (e.g., for monitoring and alerting). From that
 perspective, option B (delay the requests and return an error) seems
 better.

 Thanks,

 Jun

 On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  Posted a KIP for quotas in kafka.
  https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
 
  Appreciate any feedback.
 
  Aditya
 



Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Gwen Shapira
I think its not too late to reserve a set of error codes (200-299?)
for non-error codes.

It won't be backward compatible (i.e. clients that currently do else
throw will throw on non-errors), but perhaps its worthwhile.

On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote:
 Hey Jun,

 I'd really really really like to avoid that. Having just spent a bunch of
 time on the clients, using the error codes to encode other information
 about the response is super dangerous. The error handling is one of the
 hardest parts of the client (Guozhang chime in here).

 Generally the error handling looks like
   if(error == none)
  // good, process the request
   else if(error == KNOWN_ERROR_1)
  // handle known error 1
   else if(error == KNOWN_ERROR_2)
  // handle known error 2
   else
  throw Errors.forCode(error).exception(); // or some other default
 behavior

 This works because we have a convention that and error is something that
 prevented your getting the response so the default handling case is sane
 and forward compatible. It is tempting to use the error code to convey
 information in the success case. For example we could use error codes to
 encode whether quotas were enforced, whether the request was served out of
 cache, whether the stock market is up today, or whatever. The problem is
 that since these are not errors as far as the client is concerned it should
 not throw an exception but process the response, but now we created an
 explicit requirement that that error be handled explicitly since it is
 different. I really think that this kind of information is not an error, it
 is just information, and if we want it in the response we should do the
 right thing and add a new field to the response.

 I think you saw the Samza bug that was literally an example of this
 happening and leading to an infinite retry loop.

 Further more I really want to emphasize that hitting your quota in the
 design that Adi has proposed is actually not an error condition at all. It
 is totally reasonable in any bootstrap situation to intentionally want to
 run at the limit the system imposes on you.

 -Jay



 On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote:

 It's probably useful for a client to know whether its requests are
 throttled or not (e.g., for monitoring and alerting). From that
 perspective, option B (delay the requests and return an error) seems
 better.

 Thanks,

 Jun

 On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  Posted a KIP for quotas in kafka.
  https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
 
  Appreciate any feedback.
 
  Aditya
 



Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Guozhang Wang
I think we are really discussing two separate issues here:

1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
block-then-returnFailDuetoThrottled for quota actions on produce requests.

Both these approaches assume some kind of well-behaveness of the clients:
option a) assumes the client sets an proper timeout value while can just
ignore OKButThrottled response, while option b) assumes the client
handles the FailDuetoThrottled appropriately. For any malicious clients
that, for example, just keep retrying either intentionally or not, neither
of these approaches are actually effective.

2. For OKButThrottled and FailDuetoThrottled responses, shall we encode
them as error codes or augment the protocol to use a separate field
indicating status codes.

Today we have already incorporated some status code as error codes in the
responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
is of course using a single field for response status like the HTTP status
codes, while the cons is that it requires clients to handle the error codes
carefully.

I think maybe we can actually extend the single-code approach to overcome
its drawbacks, that is, wrap the error codes semantics to the users so that
users do not need to handle the codes one-by-one. More concretely,
following Jay's example the client could write sth. like this:


-

  if(error.isOK())
 // status code is good or the code can be simply ignored for this
request type, process the request
  else if(error.needsRetry())
 // throttled, transient error, etc: retry
  else if(error.isFatal())
 // non-retriable errors, etc: notify / terminate / other handling

-

Only when the clients really want to handle, for example FailDuetoThrottled
status code specifically, it needs to:

  if(error.isOK())
 // status code is good or the code can be simply ignored for this
request type, process the request
  else if(error == FailDuetoThrottled )
 // throttled: log it
  else if(error.needsRetry())
 // transient error, etc: retry
  else if(error.isFatal())
 // non-retriable errors, etc: notify / terminate / other handling

-

And for implementation we can probably group the codes accordingly like
HTTP status code such that we can do:

boolean Error.isOK() {
  return code  300  code = 200;
}

Guozhang

On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Agreed that trying to shoehorn non-error codes into the error field is a
 bad idea. It makes it *way* too easy to write code that looks (and should
 be) correct but is actually incorrect. If necessary, I think it's much
 better to to spend a couple of extra bytes to encode that information
 separately (a status or warning section of the response). An indication
 that throttling is occurring is something I'd expect to be indicated by a
 bit flag in the response rather than as an error code.

 Gwen - I think an error code makes sense when the request actually failed.
 Option B, which Jun was advocating, would have appended the messages
 successfully. If the rate-limiting case you're talking about had
 successfully committed the messages, I would say that's also a bad use of
 error codes.


 On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  We discussed an error code for rate-limiting (which I think made
  sense), isn't it a similar case?
 
  On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote:
   My concern is that as soon as you start encoding non-error response
   information into error codes the next question is what to do if two
 such
   codes apply (i.e. you have a replica down and the response is
 quota'd). I
   think I am trying to argue that error should mean why we failed your
   request, for which there will really only be one reason, and any other
   useful information we want to send back is just another field in the
   response.
  
   -Jay
  
   On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
   I think its not too late to reserve a set of error codes (200-299?)
   for non-error codes.
  
   It won't be backward compatible (i.e. clients that currently do else
   throw will throw on non-errors), but perhaps its worthwhile.
  
   On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
Hey Jun,
   
I'd really really really like to avoid that. Having just spent a
  bunch of
time on the clients, using the error codes to encode other
 information
about the response is super dangerous. The error handling is one of
  the
hardest parts of the client (Guozhang chime in here).
   
Generally the error handling looks like
  if(error == none)
 // good, process the request
  else if(error == KNOWN_ERROR_1)
 // handle known error 1
  else if(error == KNOWN_ERROR_2)
 // handle known error 2
  else
 throw 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Gwen Shapira
We discussed an error code for rate-limiting (which I think made
sense), isn't it a similar case?

On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote:
 My concern is that as soon as you start encoding non-error response
 information into error codes the next question is what to do if two such
 codes apply (i.e. you have a replica down and the response is quota'd). I
 think I am trying to argue that error should mean why we failed your
 request, for which there will really only be one reason, and any other
 useful information we want to send back is just another field in the
 response.

 -Jay

 On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote:

 I think its not too late to reserve a set of error codes (200-299?)
 for non-error codes.

 It won't be backward compatible (i.e. clients that currently do else
 throw will throw on non-errors), but perhaps its worthwhile.

 On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote:
  Hey Jun,
 
  I'd really really really like to avoid that. Having just spent a bunch of
  time on the clients, using the error codes to encode other information
  about the response is super dangerous. The error handling is one of the
  hardest parts of the client (Guozhang chime in here).
 
  Generally the error handling looks like
if(error == none)
   // good, process the request
else if(error == KNOWN_ERROR_1)
   // handle known error 1
else if(error == KNOWN_ERROR_2)
   // handle known error 2
else
   throw Errors.forCode(error).exception(); // or some other default
  behavior
 
  This works because we have a convention that and error is something that
  prevented your getting the response so the default handling case is sane
  and forward compatible. It is tempting to use the error code to convey
  information in the success case. For example we could use error codes to
  encode whether quotas were enforced, whether the request was served out
 of
  cache, whether the stock market is up today, or whatever. The problem is
  that since these are not errors as far as the client is concerned it
 should
  not throw an exception but process the response, but now we created an
  explicit requirement that that error be handled explicitly since it is
  different. I really think that this kind of information is not an error,
 it
  is just information, and if we want it in the response we should do the
  right thing and add a new field to the response.
 
  I think you saw the Samza bug that was literally an example of this
  happening and leading to an infinite retry loop.
 
  Further more I really want to emphasize that hitting your quota in the
  design that Adi has proposed is actually not an error condition at all.
 It
  is totally reasonable in any bootstrap situation to intentionally want to
  run at the limit the system imposes on you.
 
  -Jay
 
 
 
  On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote:
 
  It's probably useful for a client to know whether its requests are
  throttled or not (e.g., for monitoring and alerting). From that
  perspective, option B (delay the requests and return an error) seems
  better.
 
  Thanks,
 
  Jun
 
  On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar 
  aaurad...@linkedin.com.invalid wrote:
 
   Posted a KIP for quotas in kafka.
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
  
   Appreciate any feedback.
  
   Aditya
  
 



Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Guozhang Wang
Yeah in this sense the sender thread will not exist immediately in the
close(0) call, but will only terminate after the current response batch has
been processed, as will the producer instance itself.

There is a reason for this though: for a clean shutdown the caller thread
has to wait for the sender thread to join before closing the producer
instance, but this cannot be achieve if close(0) is called by the sender
thread itself (for example in KAFKA-1659, there is a proposal from Andrew
Stein on using thread.interrupt and thread.stop, but if it is called by the
ioThread itself the stop call will fail). Hence we came up with the flag
approach to let the sender thread to close as soon as it is at the barrier
of the run loop.

Guozhang

On Mon, Mar 16, 2015 at 9:41 PM, Jun Rao j...@confluent.io wrote:

 Hmm, does that mean that after close(0), the sender thread is not necessary
 gone? Normally, after closing an entity, we expect all internal threads
 associated with the entity are shut down completely.

 Thanks,

 Jun

 On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  Hi Jun,
 
  Close(0) will set two flags in sender. Running=false and a newly added
  forceClose=true. It will also set accumulator.closed=true so no further
  producer.send() will succeed.
  The sender thread will finish executing all the callbacks in current
 batch
  of responses, then it will see the forceClose flag. It will just fail all
  the incomplete batches in the producer and exit.
  So close(0) is a non-blocking call and sender thread will not try to join
  itself in close(0).
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote:
 
  How does close(0) work if it's called from the sender thread? If
 close(0)
  needs to wait for the sender thread to join, wouldn't this cause a
  deadlock?
  
  Thanks,
  
  Jun
  
  On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
  
   Thanks Guozhang. It wouldn’t be as thoroughly considered without
   discussing with you :)
  
   Jiangjie (Becket) Qin
  
   On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote:
  
   Thanks Jiangjie,
   
   After talking to you offline on this, I have been convinced and
  changed my
   preference to blocking. The immediate shutdown approach does have
 some
   unsafeness in some cases.
   
   Guozhang
   
   On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin
  j...@linkedin.com.invalid
   
   wrote:
   
It looks that the problem we want to solve and the purpose we want
 to
achieve is:
If user uses close() in callback, we want to let user be aware that
  they
should use close(0) instead of close() in the callback.
   
We have agreed that we will have an error log to inform user about
  this
mis-usage. The options differ in the way how we can force user to
  take a
look at that error log.
There are two scenarios:
1. User does not expect the program to exit.
2. User expect the program to exit.
   
For scenario 1), blocking will probably delay the discovery of the
problem. Calling close(0) exposes the problem quicker. In this
  scenario
producer just encounter a send failure when running normally.
For scenario 2), blocking will expose the problem quick. Calling
   close(-1)
might hide the problem. This scenario might include: a) Unit test
  for a
send failure. b) Message sending during a close() call from a user
   thread.
   
So as a summary table:
   
  Scenario 1) Scenario 2)
   
Blocking  Delay problem discovery Guaranteed problem
   discovery
   
Close(-1) Immediate problem discovery Problem might be
 hidden
   
   
Personally I prefer blocking because it seems providing more
  guarantees
and safer.
   
Thanks.
   
Jiangjie (Becket) Qin
   
   
On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote:
   
HI Jiangjie,

As far as I understand calling close() in the ioThread is not
  common,
   as
it
may only trigger when we saw some non-retriable error. Hence when
  user
   run
their program it is unlikely that close() will be triggered and
  problem
will be detected. So it seems to me that from the error detection
   aspect
these two options seems to be the same as people will usually
  detect it
from the producer metrics all dropping to 0.

Guozhang

On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
   j...@linkedin.com.invalid
wrote:

 It seems there are two options we can choose from when close()
 is
   called
 from sender thread (callback):
 1. Log an error and close the producer using close(-1)
 2. Log an error and block.
 (Throwing an exception will not work because we catch all the
   exception
 thrown from user callback. It will just lead to an error log.)

 My concern for the first option is that the 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Gwen Shapira
You are right, shoe-horning status into an error field is a bad idea.
While many projects use a single status field to indicate different
error and non-error states, it doesn't seem like a good fit for the
current Kafka implementation.

Do you think that adding a status field to our protocol is feasible
at this point?



On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava
e...@confluent.io wrote:
 Agreed that trying to shoehorn non-error codes into the error field is a
 bad idea. It makes it *way* too easy to write code that looks (and should
 be) correct but is actually incorrect. If necessary, I think it's much
 better to to spend a couple of extra bytes to encode that information
 separately (a status or warning section of the response). An indication
 that throttling is occurring is something I'd expect to be indicated by a
 bit flag in the response rather than as an error code.

 Gwen - I think an error code makes sense when the request actually failed.
 Option B, which Jun was advocating, would have appended the messages
 successfully. If the rate-limiting case you're talking about had
 successfully committed the messages, I would say that's also a bad use of
 error codes.


 On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

 We discussed an error code for rate-limiting (which I think made
 sense), isn't it a similar case?

 On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote:
  My concern is that as soon as you start encoding non-error response
  information into error codes the next question is what to do if two such
  codes apply (i.e. you have a replica down and the response is quota'd). I
  think I am trying to argue that error should mean why we failed your
  request, for which there will really only be one reason, and any other
  useful information we want to send back is just another field in the
  response.
 
  -Jay
 
  On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  I think its not too late to reserve a set of error codes (200-299?)
  for non-error codes.
 
  It won't be backward compatible (i.e. clients that currently do else
  throw will throw on non-errors), but perhaps its worthwhile.
 
  On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote:
   Hey Jun,
  
   I'd really really really like to avoid that. Having just spent a
 bunch of
   time on the clients, using the error codes to encode other information
   about the response is super dangerous. The error handling is one of
 the
   hardest parts of the client (Guozhang chime in here).
  
   Generally the error handling looks like
 if(error == none)
// good, process the request
 else if(error == KNOWN_ERROR_1)
// handle known error 1
 else if(error == KNOWN_ERROR_2)
// handle known error 2
 else
throw Errors.forCode(error).exception(); // or some other default
   behavior
  
   This works because we have a convention that and error is something
 that
   prevented your getting the response so the default handling case is
 sane
   and forward compatible. It is tempting to use the error code to convey
   information in the success case. For example we could use error codes
 to
   encode whether quotas were enforced, whether the request was served
 out
  of
   cache, whether the stock market is up today, or whatever. The problem
 is
   that since these are not errors as far as the client is concerned it
  should
   not throw an exception but process the response, but now we created an
   explicit requirement that that error be handled explicitly since it is
   different. I really think that this kind of information is not an
 error,
  it
   is just information, and if we want it in the response we should do
 the
   right thing and add a new field to the response.
  
   I think you saw the Samza bug that was literally an example of this
   happening and leading to an infinite retry loop.
  
   Further more I really want to emphasize that hitting your quota in the
   design that Adi has proposed is actually not an error condition at
 all.
  It
   is totally reasonable in any bootstrap situation to intentionally
 want to
   run at the limit the system imposes on you.
  
   -Jay
  
  
  
   On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote:
  
   It's probably useful for a client to know whether its requests are
   throttled or not (e.g., for monitoring and alerting). From that
   perspective, option B (delay the requests and return an error) seems
   better.
  
   Thanks,
  
   Jun
  
   On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar 
   aaurad...@linkedin.com.invalid wrote:
  
Posted a KIP for quotas in kafka.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
   
Appreciate any feedback.
   
Aditya
   
  
 




 --
 Thanks,
 Ewen


Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing

2015-03-16 Thread Jun Rao
Hi, Joe,

A couple of comments.

1. When creating a new topic, our replica assignment algorithm tries to
achieve a few things: (a) all replicas are spread evenly across brokers;
(b) the preferred replica (first replica in the assigned replica list) of
all partitions are spread evenly across brokers; (c) the non-preferred
replicas are spread out in such a way that if we lose a broker, the load on
the failed broker is spread evenly among the remaining brokers.

For example, if you look at the following replica assignment on brokers b1,
b2, and b3 (with replication factor 2). Broker b1 will be the leader for
partition p0 and p3. Broker b2 will be the leader for partition p1 and p4.
Broker b3 will be the leader for partition p2 and p5. If b1 is gone, b2
will take over as the leader for p0 and b3 will take over as the leader for
p3. This strategy makes sure that the load is even in the normal case as
well as the failure case.

b1 b2 b3
p0 p1 p2
p2 p0 p1
p3 p4 p5
p4 p5 p3

The current reassignment strategy actually maintains properties (a), (b)
and (c) after the reassignment completes.

The new algorithm takes the last few replicas from an overloaded broker and
moves them to an underloaded broker. It does reduce the data movement
compared with the current algorithm. It also maintains property (a).
However, it doesn't seem to explicitly maintain properties (b) and (c).
Data movement is a one-time cost. Maintaining balance after the data
movement has long term benefit. So, it will be useful to try to maintain
these properties even perhaps at the expense of a bit more data movement.

Also, I think the new algorithm needs to make sure that we don't move the
same replica to a new broker more than once.

2. I am not sure that we need to add a new --rebalance option. All we are
changing is the assignment strategy. If that's a better strategy than
before, there is no reason for anyone to use the old strategy. So, the new
strategy should just be used in the --generate mode.

Thanks,

Jun




On Wed, Mar 11, 2015 at 12:12 PM, Joe Stein joe.st...@stealth.ly wrote:

 Sorry for not catching up on this thread earlier, I wanted to-do this
 before the KIP got its updates so we could discuss if need be and not waste
 more time re-writing/working things that folks have issues with or such. I
 captured all the comments so far here with responses.

  So fair assignment by count (taking into account the current partition
 count of each broker) is very good. However, it's worth noting that all
 partitions are not created equal. We have actually been performing more
 rebalance work based on the partition size on disk, as given equal
 retention of all topics, the size on disk is a better indicator of the
 amount of traffic a partition gets, both in terms of storage and network
 traffic. Overall, this seems to be a better balance.

 Agreed though this is out of scope (imho) for what the motivations for the
 KIP were. The motivations section is blank (that is on me) but honestly it
 is because we did all the development, went back and forth with Neha on the
 testing and then had to back it all into the KIP process... Its a
 time/resource/scheduling and hope to update this soon on the KIP ... all of
 this is in the JIRA and code patch so its not like it is not there just not
 in the place maybe were folks are looking since we changed where folks
 should look.

 Initial cut at Motivations: the --generate is not used by a lot of folks
 because they don't trust it. Issues such as giving different results
 sometimes when you run it. Also other feedback from the community that it
 does not account for specific uses cases like adding new brokers and
 removing brokers (which is where that patch started
 https://issues.apache.org/jira/browse/KAFKA-1678 but then we changed it
 after review into just --rebalance
 https://issues.apache.org/jira/browse/KAFKA-1792). The use case for add
 and
 remove brokers is one that happens in AWS and auto scailing. There are
 other reasons for this too of course.  The goal originally was to make what
 folks are already coding today (with the output of  available in the
 project for the community. Based on the discussion in the JIRA with Neha we
 all agreed that making it be a faire rebalance would fulfill both uses
 cases.

  In addition to this, I think there is very much a need to have Kafka be
 rack-aware. That is, to be able to assure that for a given cluster, you
 never assign all replicas for a given partition in the same rack. This
 would allow us to guard against maintenances or power failures that affect
 a full rack of systems (or a given switch).

 Agreed, this though I think is out of scope for this change and something
 we can also do in the future. There is more that we have to figure out for
 rack aware specifically answering how do we know what rack the broker is
 on. I really really (really) worry that we keep trying to put too much
 into a single change the discussions go into rabbit holes and 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Jay Kreps
My concern is that as soon as you start encoding non-error response
information into error codes the next question is what to do if two such
codes apply (i.e. you have a replica down and the response is quota'd). I
think I am trying to argue that error should mean why we failed your
request, for which there will really only be one reason, and any other
useful information we want to send back is just another field in the
response.

-Jay

On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote:

 I think its not too late to reserve a set of error codes (200-299?)
 for non-error codes.

 It won't be backward compatible (i.e. clients that currently do else
 throw will throw on non-errors), but perhaps its worthwhile.

 On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote:
  Hey Jun,
 
  I'd really really really like to avoid that. Having just spent a bunch of
  time on the clients, using the error codes to encode other information
  about the response is super dangerous. The error handling is one of the
  hardest parts of the client (Guozhang chime in here).
 
  Generally the error handling looks like
if(error == none)
   // good, process the request
else if(error == KNOWN_ERROR_1)
   // handle known error 1
else if(error == KNOWN_ERROR_2)
   // handle known error 2
else
   throw Errors.forCode(error).exception(); // or some other default
  behavior
 
  This works because we have a convention that and error is something that
  prevented your getting the response so the default handling case is sane
  and forward compatible. It is tempting to use the error code to convey
  information in the success case. For example we could use error codes to
  encode whether quotas were enforced, whether the request was served out
 of
  cache, whether the stock market is up today, or whatever. The problem is
  that since these are not errors as far as the client is concerned it
 should
  not throw an exception but process the response, but now we created an
  explicit requirement that that error be handled explicitly since it is
  different. I really think that this kind of information is not an error,
 it
  is just information, and if we want it in the response we should do the
  right thing and add a new field to the response.
 
  I think you saw the Samza bug that was literally an example of this
  happening and leading to an infinite retry loop.
 
  Further more I really want to emphasize that hitting your quota in the
  design that Adi has proposed is actually not an error condition at all.
 It
  is totally reasonable in any bootstrap situation to intentionally want to
  run at the limit the system imposes on you.
 
  -Jay
 
 
 
  On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote:
 
  It's probably useful for a client to know whether its requests are
  throttled or not (e.g., for monitoring and alerting). From that
  perspective, option B (delay the requests and return an error) seems
  better.
 
  Thanks,
 
  Jun
 
  On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar 
  aaurad...@linkedin.com.invalid wrote:
 
   Posted a KIP for quotas in kafka.
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
  
   Appreciate any feedback.
  
   Aditya
  
 



Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/
---

(Updated March 16, 2015, 9:13 p.m.)


Review request for kafka.


Bugs: KAFKA-2013
https://issues.apache.org/jira/browse/KAFKA-2013


Repository: kafka


Description
---

purgatory micro benchmark


Diffs (updated)
-

  core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31893/diff/


Testing
---


Thanks,

Yasuhiro Matsuda



[jira] [Commented] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363957#comment-14363957
 ] 

Yasuhiro Matsuda commented on KAFKA-2013:
-

Updated reviewboard https://reviews.apache.org/r/31893/diff/
 against branch origin/trunk

 benchmark test for the purgatory
 

 Key: KAFKA-2013
 URL: https://issues.apache.org/jira/browse/KAFKA-2013
 Project: Kafka
  Issue Type: Test
  Components: purgatory
Reporter: Yasuhiro Matsuda
Assignee: Yasuhiro Matsuda
Priority: Trivial
 Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, 
 KAFKA-2013_2015-03-16_14:13:20.patch


 We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-2013:

Attachment: KAFKA-2013_2015-03-16_14:39:07.patch

 benchmark test for the purgatory
 

 Key: KAFKA-2013
 URL: https://issues.apache.org/jira/browse/KAFKA-2013
 Project: Kafka
  Issue Type: Test
  Components: purgatory
Reporter: Yasuhiro Matsuda
Assignee: Yasuhiro Matsuda
Priority: Trivial
 Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, 
 KAFKA-2013_2015-03-16_14:13:20.patch, KAFKA-2013_2015-03-16_14:39:07.patch


 We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Jun Rao
It's probably useful for a client to know whether its requests are
throttled or not (e.g., for monitoring and alerting). From that
perspective, option B (delay the requests and return an error) seems better.

Thanks,

Jun

On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Posted a KIP for quotas in kafka.
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas

 Appreciate any feedback.

 Aditya



Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
Hi Jun,

Close(0) will set two flags in sender. Running=false and a newly added
forceClose=true. It will also set accumulator.closed=true so no further
producer.send() will succeed.
The sender thread will finish executing all the callbacks in current batch
of responses, then it will see the forceClose flag. It will just fail all
the incomplete batches in the producer and exit.
So close(0) is a non-blocking call and sender thread will not try to join
itself in close(0).

Thanks.

Jiangjie (Becket) Qin

On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote:

How does close(0) work if it's called from the sender thread? If close(0)
needs to wait for the sender thread to join, wouldn't this cause a
deadlock?

Thanks,

Jun

On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Thanks Guozhang. It wouldn’t be as thoroughly considered without
 discussing with you :)

 Jiangjie (Becket) Qin

 On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote:

 Thanks Jiangjie,
 
 After talking to you offline on this, I have been convinced and
changed my
 preference to blocking. The immediate shutdown approach does have some
 unsafeness in some cases.
 
 Guozhang
 
 On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin
j...@linkedin.com.invalid
 
 wrote:
 
  It looks that the problem we want to solve and the purpose we want to
  achieve is:
  If user uses close() in callback, we want to let user be aware that
they
  should use close(0) instead of close() in the callback.
 
  We have agreed that we will have an error log to inform user about
this
  mis-usage. The options differ in the way how we can force user to
take a
  look at that error log.
  There are two scenarios:
  1. User does not expect the program to exit.
  2. User expect the program to exit.
 
  For scenario 1), blocking will probably delay the discovery of the
  problem. Calling close(0) exposes the problem quicker. In this
scenario
  producer just encounter a send failure when running normally.
  For scenario 2), blocking will expose the problem quick. Calling
 close(-1)
  might hide the problem. This scenario might include: a) Unit test
for a
  send failure. b) Message sending during a close() call from a user
 thread.
 
  So as a summary table:
 
Scenario 1) Scenario 2)
 
  Blocking  Delay problem discovery Guaranteed problem
 discovery
 
  Close(-1) Immediate problem discovery Problem might be hidden
 
 
  Personally I prefer blocking because it seems providing more
guarantees
  and safer.
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
 
  On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote:
 
  HI Jiangjie,
  
  As far as I understand calling close() in the ioThread is not
common,
 as
  it
  may only trigger when we saw some non-retriable error. Hence when
user
 run
  their program it is unlikely that close() will be triggered and
problem
  will be detected. So it seems to me that from the error detection
 aspect
  these two options seems to be the same as people will usually
detect it
  from the producer metrics all dropping to 0.
  
  Guozhang
  
  On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
  
   It seems there are two options we can choose from when close() is
 called
   from sender thread (callback):
   1. Log an error and close the producer using close(-1)
   2. Log an error and block.
   (Throwing an exception will not work because we catch all the
 exception
   thrown from user callback. It will just lead to an error log.)
  
   My concern for the first option is that the producer will be
closed
 even
   if we logged and error. I am wondering if some user would not even
 take
  a
   look at the log if producer is closed normally. Because from the
  programs
   behavior, everything looks good. If that is the case, the error
 message
  we
   logged probably will just be ignored until some day when people
check
  the
   log and see it.
  
   As for the second option, because producer does not close but
blocks.
  User
   will notice this the first time they run the program. They
probably
 will
   look at the log to see why producer could not be closed and they
will
  see
   the error log we put there. So they will get informed about this
  mis-usage
   of close() in sender thread the first time they run the code
instead
 of
   some time later.
  
   Personally I prefer the second one because it is more obvious that
   something was wrong.
  
   Jiangjie (Becket) Qin
  
   On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote:
  
   Yeah I agree we should not silently change the behavior of the
 function
   with the given parameters; and I would prefer
  error-logging-and-shutdown
   over blocking when close(0) is used, since as Neha suggested
 blocking
   would also not proceed with sending any data, bu will just let
 users to
   realize the issue later than sooner.
   
   On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede

[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364184#comment-14364184
 ] 

Joe Stein commented on KAFKA-2023:
--

works ok for me on ubuntu and redhat on two different networks

{code}

$ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
Cloning into 'kafka'...
remote: Counting objects: 24607, done.
remote: Compressing objects: 100% (9212/9212), done.
remote: Total 24607 (delta 14447), reused 19803 (delta 11465)
Receiving objects: 100% (24607/24607), 15.62 MiB | 3.46 MiB/s, done.
Resolving deltas: 100% (14447/14447), done.
Checking connectivity... done.
{code}

 git clone kafka repository requires https
 -

 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Priority: Minor

 From http://kafka.apache.org/code.html: 
 Our code is kept in git. You can check it out like this:
   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 On CentOS 6.5:
 {code}
 $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 error: RPC failed; result=22, HTTP code = 405
 {code}
 while:
 {code}
 $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 remote: Counting objects: 24607, done.
 remote: Compressing objects: 100% (9212/9212), done.
 remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
 Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
 Resolving deltas: 100% (14449/14449), done.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jun Rao
How does close(0) work if it's called from the sender thread? If close(0)
needs to wait for the sender thread to join, wouldn't this cause a deadlock?

Thanks,

Jun

On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Thanks Guozhang. It wouldn’t be as thoroughly considered without
 discussing with you :)

 Jiangjie (Becket) Qin

 On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote:

 Thanks Jiangjie,
 
 After talking to you offline on this, I have been convinced and changed my
 preference to blocking. The immediate shutdown approach does have some
 unsafeness in some cases.
 
 Guozhang
 
 On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  It looks that the problem we want to solve and the purpose we want to
  achieve is:
  If user uses close() in callback, we want to let user be aware that they
  should use close(0) instead of close() in the callback.
 
  We have agreed that we will have an error log to inform user about this
  mis-usage. The options differ in the way how we can force user to take a
  look at that error log.
  There are two scenarios:
  1. User does not expect the program to exit.
  2. User expect the program to exit.
 
  For scenario 1), blocking will probably delay the discovery of the
  problem. Calling close(0) exposes the problem quicker. In this scenario
  producer just encounter a send failure when running normally.
  For scenario 2), blocking will expose the problem quick. Calling
 close(-1)
  might hide the problem. This scenario might include: a) Unit test for a
  send failure. b) Message sending during a close() call from a user
 thread.
 
  So as a summary table:
 
Scenario 1) Scenario 2)
 
  Blocking  Delay problem discovery Guaranteed problem
 discovery
 
  Close(-1) Immediate problem discovery Problem might be hidden
 
 
  Personally I prefer blocking because it seems providing more guarantees
  and safer.
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
 
  On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote:
 
  HI Jiangjie,
  
  As far as I understand calling close() in the ioThread is not common,
 as
  it
  may only trigger when we saw some non-retriable error. Hence when user
 run
  their program it is unlikely that close() will be triggered and problem
  will be detected. So it seems to me that from the error detection
 aspect
  these two options seems to be the same as people will usually detect it
  from the producer metrics all dropping to 0.
  
  Guozhang
  
  On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
  
   It seems there are two options we can choose from when close() is
 called
   from sender thread (callback):
   1. Log an error and close the producer using close(-1)
   2. Log an error and block.
   (Throwing an exception will not work because we catch all the
 exception
   thrown from user callback. It will just lead to an error log.)
  
   My concern for the first option is that the producer will be closed
 even
   if we logged and error. I am wondering if some user would not even
 take
  a
   look at the log if producer is closed normally. Because from the
  programs
   behavior, everything looks good. If that is the case, the error
 message
  we
   logged probably will just be ignored until some day when people check
  the
   log and see it.
  
   As for the second option, because producer does not close but blocks.
  User
   will notice this the first time they run the program. They probably
 will
   look at the log to see why producer could not be closed and they will
  see
   the error log we put there. So they will get informed about this
  mis-usage
   of close() in sender thread the first time they run the code instead
 of
   some time later.
  
   Personally I prefer the second one because it is more obvious that
   something was wrong.
  
   Jiangjie (Becket) Qin
  
   On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote:
  
   Yeah I agree we should not silently change the behavior of the
 function
   with the given parameters; and I would prefer
  error-logging-and-shutdown
   over blocking when close(0) is used, since as Neha suggested
 blocking
   would also not proceed with sending any data, bu will just let
 users to
   realize the issue later than sooner.
   
   On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io
  wrote:
   

 And I also agree it is better if we can make producer block when
 close() is called from sender thread so user will notice
 something
   went
 wrong.
   
   
This isn't a great experience either. Why can't we just throw an
   exception
for a behavior we know is incorrect and we'd like the user to
 know.
Blocking as a means of doing that seems wrong and annoying.
   
On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com
   wrote:
   
 Cool.

 I think blocking is good or 

Re: Review Request 31742: Patch for KAFKA-527

2015-03-16 Thread Yasuhiro Matsuda


On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote:
  The inheritance of MessageWriter from BufferingOutputStream is a bit 
  confusing, since it will always use itself in the writePayload function 
  parameter. 
  
  I feel it is more clear to read the code if we just let MessageWriter 
  contains a var of BufferingOutputStream; and instead of pass in the 
  function logic of writing the message, we can just pass in messages and 
  offsetCounter in the write() call which will then write the messages itself.
 
 Yasuhiro Matsuda wrote:
 It is true that the current code writes only through writePayload. But I 
 wanted MessageWriter to be a subclass of OutputStream to be more generic in 
 case we need to write additional inforation other than messages in future.
 
 Guozhang Wang wrote:
 As for now MessageWriter's only public function is write(key, codec) 
 (valueWritefunction), which is used for writing a single message. Also its 
 private functions withCrc32Prefix / withLengthPrefix is only used for message 
 writing. So it is a bit unclear about your motivation in future extensions. 
 Could you elaborate a bit more on that?

I don't know future usages at this point.

Besides, withCrc32Prefix uses internal structure of BufferingOutputStream for 
efficiency. Does this justify the inheritance? If we don't do so, the code will 
be more cluttered.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76454
---


On March 16, 2015, 10:19 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31742/
 ---
 
 (Updated March 16, 2015, 10:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-527
 https://issues.apache.org/jira/browse/KAFKA-527
 
 
 Repository: kafka
 
 
 Description
 ---
 
 less byte copies
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
 9c694719dc9b515fb3c3ae96435a87b334044272 
   core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31742/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




Re: Review Request 31958: Patch for KAFKA-1684

2015-03-16 Thread Michael Herstine

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31958/#review76640
---



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/31958/#comment124223

`{want,needs}ClientAuth` can be tricky-- check the javadoc for 
`SSLEngine.setWantClientAuth`... there are actually only three states: 
required, requested, not desired, and the last call to `{want,needs}ClientAuth` 
wins.

So, if needs is True and wants is false, invoking the methods in this 
order will actually overwrite the needs setting. Recommend something like:

if (sslConnectionConfig.needClientAuth) {
sslEngine.setNeedClientAuth(true);
} else {
sslEngine.setNeedClientAuth(false);
sslEngine.setWantClientAuth(sslConnectionConfig.wantClientAuth);
}



core/src/main/scala/kafka/network/ssl/SSLChannel.scala
https://reviews.apache.org/r/31958/#comment124229

Suppose SSLEngine has written the current message (via `wrap`) to 
`netOutBuffer`, but that the write call in `flush`, when invoked from 
`handshakeWrap`, didn't write the entire buffer to the underlying socket.

Would not `handshakeStatus` as reported from SSLEngine now be 
`NEEDS_UNWRAP`? And wouldn't that cause us to fall through to the 
`NEEDS_UNWRAP` case?

Or do we not fall through in Scala case statements?



core/src/main/scala/kafka/network/ssl/SSLChannel.scala
https://reviews.apache.org/r/31958/#comment124235

Not sure about this, but do we want to update the position  limit of the 
buffer? We flipped it after the last read, but I can't rememeber if 
SSLEngine.unwrap will update them if there's an incomplete packet (i.e. in the 
BUFFER_UNDERFLOW case).


Just a few questions on some corner cases... handling all the possibilities 
when handshaking over NIO is really tough.

- Michael Herstine


On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31958/
 ---
 
 (Updated March 11, 2015, 9:36 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1684
 https://issues.apache.org/jira/browse/KAFKA-1684
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1684. Implement TLS/SSL authentication.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/Channel.scala PRE-CREATION 
   core/src/main/scala/kafka/network/SocketServer.scala 
 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
   core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION 
   core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 48e33626695ad8a28b0018362ac225f11df94973 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 dddef938fabae157ed8644536eb1a2f329fb42b7 
   core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
   core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31958/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 31958: Patch for KAFKA-1684

2015-03-16 Thread Sriharsha Chintalapani


 On March 16, 2015, 9:24 p.m., Michael Herstine wrote:
  core/src/main/scala/kafka/network/ssl/SSLChannel.scala, line 137
  https://reviews.apache.org/r/31958/diff/1/?file=891658#file891658line137
 
  Suppose SSLEngine has written the current message (via `wrap`) to 
  `netOutBuffer`, but that the write call in `flush`, when invoked from 
  `handshakeWrap`, didn't write the entire buffer to the underlying socket.
  
  Would not `handshakeStatus` as reported from SSLEngine now be 
  `NEEDS_UNWRAP`? And wouldn't that cause us to fall through to the 
  `NEEDS_UNWRAP` case?
  
  Or do we not fall through in Scala case statements?

Thanks for the review. Ideally it should be fall through to NEEDS_UNWRAP since 
scala case statements doesn't allow java style follow-through I am looking at 
alternatives.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31958/#review76640
---


On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31958/
 ---
 
 (Updated March 11, 2015, 9:36 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1684
 https://issues.apache.org/jira/browse/KAFKA-1684
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1684. Implement TLS/SSL authentication.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/Channel.scala PRE-CREATION 
   core/src/main/scala/kafka/network/SocketServer.scala 
 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
   core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION 
   core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 48e33626695ad8a28b0018362ac225f11df94973 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 dddef938fabae157ed8644536eb1a2f329fb42b7 
   core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
   core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31958/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-16 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364166#comment-14364166
 ] 

Parth Brahmbhatt commented on KAFKA-1688:
-

[~jkreps][~junrao] Not sure if you guys had time to review 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface
 but I have to make a design choice and will appreciate your input.

As mentioned in the KIP, I originally thought we will just update TopicMetadata 
class to hold the acls and owner which should allow us to reuse 
TopicMetadataCache to get the acl information. However on further inspection I 
realized that TopicMetadataCache is just serving as the cache for Topic's 
partition state info and we have a completely different mechanism for caching 
and updating topic config entries.

Currently the topic config is all about Log configuration so we have  a 
TopicConfigManager which takes in a Log instance and keeps updating that 
instance's config instance as and when the topic config is updated. The topic 
config update notifications are sent using zk watchers by Controller.

I propose to introduce a TopicConfigCache which will be updated by 
TopicConfigManager on any config changes. Both the log instance and authorizer 
will share an instance of TopicConfigCache to read the config entries from it. 
The acls and owner of the topic will be stored as part of topic config. 

An alternate solution is to modify the TopicMetadataCache so it also has topic 
configs. The controller will have to send updateTopicMedataCache requests on 
both partition changes and config changes. We will have to deprecate 
TopicConfigManager and the controller code that updates zk state to fire config 
change watchers. 

I am currently blocked by this so I appreciate any feedback from you guys.

 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Parth Brahmbhatt
 Fix For: 0.8.3


 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/
---

(Updated March 16, 2015, 9:39 p.m.)


Review request for kafka.


Bugs: KAFKA-2013
https://issues.apache.org/jira/browse/KAFKA-2013


Repository: kafka


Description
---

purgatory micro benchmark


Diffs (updated)
-

  core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31893/diff/


Testing
---


Thanks,

Yasuhiro Matsuda



Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76641
---


lgtm overall. Minor comments below.


core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
https://reviews.apache.org/r/31967/#comment124277

should be not be - can you fix/remove?



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/31967/#comment124224

Wonder why this got split. Can you re-optimize imports?



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/31967/#comment124225

Can you move the if statement to the next line



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/31967/#comment124243

Can we rename the argument to maxLagMs?



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/31967/#comment124245

Minor edit:
has not read up to the LEO within the last replicaMaxLag ms, then the 
follower is lagging and should be removed from the ISR



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/31967/#comment124251

(Not part of your change, but could you change [%s,%d] to %s and replace 
topic, partitionId to TopicAndPartition(topic, partitionId)? We are trying to 
adopt a uniform convention everywhere in printing topic-partition and have been 
making these changes gradually (as they appear).



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/31967/#comment124252

same here



core/src/main/scala/kafka/cluster/Replica.scala
https://reviews.apache.org/r/31967/#comment124256

Can you rename this to lagBeginTimeMsUnderlying?



core/src/main/scala/kafka/cluster/Replica.scala
https://reviews.apache.org/r/31967/#comment124254

read up to the log end offset snapshot when the read was initiated ...



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/31967/#comment124260

Can we rename this to logEndOffsetBeforeRead?

Also, can we just do with the Long (offset) instead of the entire 
LogOffsetMetadata?


- Joel Koshy


On March 16, 2015, 6:32 p.m., Aditya Auradkar wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31967/
 ---
 
 (Updated March 16, 2015, 6:32 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1546
 https://issues.apache.org/jira/browse/KAFKA-1546
 
 
 Repository: kafka
 
 
 Description
 ---
 
 PATCH for KAFKA-1546
 
 Brief summary of changes:
 - Added a lagBegin metric inside Replica to track the lag in terms of time 
 since the replica did not read from the LEO
 - Using lag begin value in the check for ISR expand and shrink
 - Removed the max lag messages config since it is no longer necessary
 - Returning the initialLogEndOffset in LogReadResult corresponding to the the 
 LEO before actually reading from the log.
 - Unit test cases to test ISR shrinkage and expansion
 
 Updated KAFKA-1546 patch to reflect Neha and Jun's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/cluster/Partition.scala 
 c4bf48a801007ebe7497077d2018d6dffe1677d4 
   core/src/main/scala/kafka/cluster/Replica.scala 
 bd13c20338ce3d73113224440e858a12814e5adb 
   core/src/main/scala/kafka/log/Log.scala 
 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
   core/src/main/scala/kafka/server/FetchDataInfo.scala 
 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 48e33626695ad8a28b0018362ac225f11df94973 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
 
 Diff: https://reviews.apache.org/r/31967/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aditya Auradkar
 




[jira] [Created] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Anatoli Fomenko (JIRA)
Anatoli Fomenko created KAFKA-2023:
--

 Summary: git clone kafka repository requires https
 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Priority: Minor


From http://kafka.apache.org/code.html: 

Our code is kept in git. You can check it out like this:
git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka

On CentOS 6.5:

{code}
$ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
Initialized empty Git repository in /home/anatoli/git/kafka/.git/
error: RPC failed; result=22, HTTP code = 405
{code}

while:

{code}
$ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
Initialized empty Git repository in /home/anatoli/git/kafka/.git/
remote: Counting objects: 24607, done.
remote: Compressing objects: 100% (9212/9212), done.
remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
Resolving deltas: 100% (14449/14449), done.
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Anatoli Fomenko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anatoli Fomenko updated KAFKA-2023:
---
Attachment: KAFKA-2023.patch

Please review the patch.

Thank you.

 git clone kafka repository requires https
 -

 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Priority: Minor
 Attachments: KAFKA-2023.patch


 From http://kafka.apache.org/code.html: 
 Our code is kept in git. You can check it out like this:
   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 On CentOS 6.5:
 {code}
 $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 error: RPC failed; result=22, HTTP code = 405
 {code}
 while:
 {code}
 $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
 Initialized empty Git repository in /home/anatoli/git/kafka/.git/
 remote: Counting objects: 24607, done.
 remote: Compressing objects: 100% (9212/9212), done.
 remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
 Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
 Resolving deltas: 100% (14449/14449), done.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31958: Patch for KAFKA-1684

2015-03-16 Thread Sriharsha Chintalapani


 On March 16, 2015, 9:24 p.m., Michael Herstine wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 318
  https://reviews.apache.org/r/31958/diff/1/?file=891657#file891657line318
 
  `{want,needs}ClientAuth` can be tricky-- check the javadoc for 
  `SSLEngine.setWantClientAuth`... there are actually only three states: 
  required, requested, not desired, and the last call to 
  `{want,needs}ClientAuth` wins.
  
  So, if needs is True and wants is false, invoking the methods in 
  this order will actually overwrite the needs setting. Recommend something 
  like:
  
  if (sslConnectionConfig.needClientAuth) {
  sslEngine.setNeedClientAuth(true);
  } else {
  sslEngine.setNeedClientAuth(false);
  sslEngine.setWantClientAuth(sslConnectionConfig.wantClientAuth);
  }

Thanks for pointing it out I'll fix that.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31958/#review76640
---


On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31958/
 ---
 
 (Updated March 11, 2015, 9:36 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1684
 https://issues.apache.org/jira/browse/KAFKA-1684
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1684. Implement TLS/SSL authentication.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/Channel.scala PRE-CREATION 
   core/src/main/scala/kafka/network/SocketServer.scala 
 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
   core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION 
   core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 48e33626695ad8a28b0018362ac225f11df94973 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 dddef938fabae157ed8644536eb1a2f329fb42b7 
   core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
   core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31958/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda


 On March 16, 2015, 5:17 p.m., Jun Rao wrote:
  core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala, line 193
  https://reviews.apache.org/r/31893/diff/1/?file=890190#file890190line193
 
  Is there a particular reason that we need to overwrite isCompleted()? 
  Typically, only tryComplete() and onComplete() need to be overwritten in a 
  subclass of DelayedOperation.
  
  Actually, I am not sure how we complete the requests before the timeout 
  is reached since there is no explict call for tryComplete()?
 
 Yasuhiro Matsuda wrote:
 isCompleted checks if the current time has passed the schedule completion 
 time rather than if forceComplete has been called. It makes isCompleted 
 always accurate.
 
 Purgatory checks watcher lists every so often and calls isCompleted. 
 Calling forceComplete from isCompeleted ensures that a completed request is 
 removed from the timing wheels in the new implementation. In terms of timing, 
 this is not very accurate because completed requests may stay longer then 
 they should be. This doesn't affect the old implementaion at all, but it may 
 impose some overheads on the new implementaion. Still, the new one 
 outperforms the old one.
 
 It is ideal if we can call call forceComplete on scheduled completion 
 time. It requires another timer (DelayQueue or Timer) for that. I think it is 
 too much overhead to measure purgatory performace. And also it is hard to 
 guarantee such a timer works accurately in this test setting.

It looks the watcher list check happens frequent enough in both new and old 
implementations. The average delay to acutal forceComplete call from the 
completion time is several tens of millisecs (low request rate) to 
sub-millisecs (high request rate).


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/#review76566
---


On March 16, 2015, 9:39 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31893/
 ---
 
 (Updated March 16, 2015, 9:39 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2013
 https://issues.apache.org/jira/browse/KAFKA-2013
 
 
 Repository: kafka
 
 
 Description
 ---
 
 purgatory micro benchmark
 
 
 Diffs
 -
 
   core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31893/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
Thanks Guozhang. It wouldn’t be as thoroughly considered without
discussing with you :)

Jiangjie (Becket) Qin

On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote:

Thanks Jiangjie,

After talking to you offline on this, I have been convinced and changed my
preference to blocking. The immediate shutdown approach does have some
unsafeness in some cases.

Guozhang

On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 It looks that the problem we want to solve and the purpose we want to
 achieve is:
 If user uses close() in callback, we want to let user be aware that they
 should use close(0) instead of close() in the callback.

 We have agreed that we will have an error log to inform user about this
 mis-usage. The options differ in the way how we can force user to take a
 look at that error log.
 There are two scenarios:
 1. User does not expect the program to exit.
 2. User expect the program to exit.

 For scenario 1), blocking will probably delay the discovery of the
 problem. Calling close(0) exposes the problem quicker. In this scenario
 producer just encounter a send failure when running normally.
 For scenario 2), blocking will expose the problem quick. Calling
close(-1)
 might hide the problem. This scenario might include: a) Unit test for a
 send failure. b) Message sending during a close() call from a user
thread.

 So as a summary table:

   Scenario 1) Scenario 2)

 Blocking  Delay problem discovery Guaranteed problem
discovery

 Close(-1) Immediate problem discovery Problem might be hidden


 Personally I prefer blocking because it seems providing more guarantees
 and safer.

 Thanks.

 Jiangjie (Becket) Qin


 On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote:

 HI Jiangjie,
 
 As far as I understand calling close() in the ioThread is not common,
as
 it
 may only trigger when we saw some non-retriable error. Hence when user
run
 their program it is unlikely that close() will be triggered and problem
 will be detected. So it seems to me that from the error detection
aspect
 these two options seems to be the same as people will usually detect it
 from the producer metrics all dropping to 0.
 
 Guozhang
 
 On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:
 
  It seems there are two options we can choose from when close() is
called
  from sender thread (callback):
  1. Log an error and close the producer using close(-1)
  2. Log an error and block.
  (Throwing an exception will not work because we catch all the
exception
  thrown from user callback. It will just lead to an error log.)
 
  My concern for the first option is that the producer will be closed
even
  if we logged and error. I am wondering if some user would not even
take
 a
  look at the log if producer is closed normally. Because from the
 programs
  behavior, everything looks good. If that is the case, the error
message
 we
  logged probably will just be ignored until some day when people check
 the
  log and see it.
 
  As for the second option, because producer does not close but blocks.
 User
  will notice this the first time they run the program. They probably
will
  look at the log to see why producer could not be closed and they will
 see
  the error log we put there. So they will get informed about this
 mis-usage
  of close() in sender thread the first time they run the code instead
of
  some time later.
 
  Personally I prefer the second one because it is more obvious that
  something was wrong.
 
  Jiangjie (Becket) Qin
 
  On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote:
 
  Yeah I agree we should not silently change the behavior of the
function
  with the given parameters; and I would prefer
 error-logging-and-shutdown
  over blocking when close(0) is used, since as Neha suggested
blocking
  would also not proceed with sending any data, bu will just let
users to
  realize the issue later than sooner.
  
  On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io
 wrote:
  
   
And I also agree it is better if we can make producer block when
close() is called from sender thread so user will notice
something
  went
wrong.
  
  
   This isn't a great experience either. Why can't we just throw an
  exception
   for a behavior we know is incorrect and we'd like the user to
know.
   Blocking as a means of doing that seems wrong and annoying.
  
   On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com
  wrote:
  
Cool.
   
I think blocking is good or alternately throwing an exception
 directly
   from
close(). Basically I would just worry about subtly doing
something
   slightly
different from what the user asked for as it will be hard to
notice
  that
behavior difference.
   
-Jay
   
On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
  j...@linkedin.com.invalid
   
wrote:
   
 Hi Jay,

 I have modified the KIP 

[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364085#comment-14364085
 ] 

Yasuhiro Matsuda commented on KAFKA-527:


Updated reviewboard https://reviews.apache.org/r/31742/diff/
 against branch origin/trunk

 Compression support does numerous byte copies
 -

 Key: KAFKA-527
 URL: https://issues.apache.org/jira/browse/KAFKA-527
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Jay Kreps
Assignee: Yasuhiro Matsuda
Priority: Critical
 Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
 KAFKA-527_2015-03-16_15:19:29.patch, java.hprof.no-compression.txt, 
 java.hprof.snappy.text


 The data path for compressing or decompressing messages is extremely 
 inefficient. We do something like 7 (?) complete copies of the data, often 
 for simple things like adding a 4 byte size to the front. I am not sure how 
 this went by unnoticed.
 This is likely the root cause of the performance issues we saw in doing bulk 
 recompression of data in mirror maker.
 The mismatch between the InputStream and OutputStream interfaces and the 
 Message/MessageSet interfaces which are based on byte buffers is the cause of 
 many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-527:
---
Attachment: KAFKA-527_2015-03-16_15:19:29.patch

 Compression support does numerous byte copies
 -

 Key: KAFKA-527
 URL: https://issues.apache.org/jira/browse/KAFKA-527
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Jay Kreps
Assignee: Yasuhiro Matsuda
Priority: Critical
 Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
 KAFKA-527_2015-03-16_15:19:29.patch, java.hprof.no-compression.txt, 
 java.hprof.snappy.text


 The data path for compressing or decompressing messages is extremely 
 inefficient. We do something like 7 (?) complete copies of the data, often 
 for simple things like adding a 4 byte size to the front. I am not sure how 
 this went by unnoticed.
 This is likely the root cause of the performance issues we saw in doing bulk 
 recompression of data in mirror maker.
 The mismatch between the InputStream and OutputStream interfaces and the 
 Message/MessageSet interfaces which are based on byte buffers is the cause of 
 many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1546) Automate replica lag tuning

2015-03-16 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-1546:
-
Attachment: KAFKA-1546_2015-03-16_11:31:39.patch

 Automate replica lag tuning
 ---

 Key: KAFKA-1546
 URL: https://issues.apache.org/jira/browse/KAFKA-1546
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
Reporter: Neha Narkhede
Assignee: Aditya Auradkar
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, 
 KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch


 Currently, there is no good way to tune the replica lag configs to 
 automatically account for high and low volume topics on the same cluster. 
 For the low-volume topic it will take a very long time to detect a lagging
 replica, and for the high-volume topic it will have false-positives.
 One approach to making this easier would be to have the configuration
 be something like replica.lag.max.ms and translate this into a number
 of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1546) Automate replica lag tuning

2015-03-16 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363672#comment-14363672
 ] 

Aditya A Auradkar commented on KAFKA-1546:
--

Updated reviewboard https://reviews.apache.org/r/31967/diff/
 against branch origin/trunk

 Automate replica lag tuning
 ---

 Key: KAFKA-1546
 URL: https://issues.apache.org/jira/browse/KAFKA-1546
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
Reporter: Neha Narkhede
Assignee: Aditya Auradkar
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, 
 KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch


 Currently, there is no good way to tune the replica lag configs to 
 automatically account for high and low volume topics on the same cluster. 
 For the low-volume topic it will take a very long time to detect a lagging
 replica, and for the high-volume topic it will have false-positives.
 One approach to making this easier would be to have the configuration
 be something like replica.lag.max.ms and translate this into a number
 of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
---

(Updated March 16, 2015, 6:31 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
---

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time 
since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the 
LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala 
c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala 
bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 
06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 
26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
---


Thanks,

Aditya Auradkar



Re: Review Request 31742: Patch for KAFKA-527

2015-03-16 Thread Yasuhiro Matsuda


 On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/message/MessageWriter.scala, line 29
  https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line29
 
  Add a check that codec should not be NoCompression.

Why the codec should not be NoCompression? The code works with NoCompression, 
too.


 On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/message/MessageWriter.scala, line 97
  https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line97
 
  Could we use comments in 
  
  /**
   *
   */
   
  format?

Is this comment style prohibitted? This class is for internal use with fairly 
localized usage.


 On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/message/MessageWriter.scala, line 117
  https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line117
 
  We can just pass in the Byte here.

This is a contract of OutputStream.


 On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/message/MessageWriter.scala, line 135
  https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line135
 
  Better group the private functions together after the public functions.

Well, I don't think it is particulary better way to organize code, but if you 
insist I can change it.
Kafka code base doesn't seem to follow that convention...


On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote:
  The inheritance of MessageWriter from BufferingOutputStream is a bit 
  confusing, since it will always use itself in the writePayload function 
  parameter. 
  
  I feel it is more clear to read the code if we just let MessageWriter 
  contains a var of BufferingOutputStream; and instead of pass in the 
  function logic of writing the message, we can just pass in messages and 
  offsetCounter in the write() call which will then write the messages itself.

It is true that the current code writes only through writePayload. But I wanted 
MessageWriter to be a subclass of OutputStream to be more generic in case we 
need to write additional inforation other than messages in future.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76454
---


On March 4, 2015, 7:43 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31742/
 ---
 
 (Updated March 4, 2015, 7:43 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-527
 https://issues.apache.org/jira/browse/KAFKA-527
 
 
 Repository: kafka
 
 
 Description
 ---
 
 less byte copies
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
 9c694719dc9b515fb3c3ae96435a87b334044272 
   core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31742/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs

2015-03-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363678#comment-14363678
 ] 

Jun Rao commented on KAFKA-2020:


The following is the protocol for TopicMetadataResponse. Currently, we do the 
following:
1. If leader is not available, we set the partition level error code to 
LeaderNotAvailable.
2. If a non-leader replica is not available, we take that replica out of the 
the assigned replica list and isr in the response. As an indication for doing 
that, we set the partition level error code to ReplicaNotAvailable.

This has a few problems. First, ReplicaNotAvailable probably shouldn't be an 
error, at least for the normal producer/consumer clients that just want to find 
out the leader. Second, it can happen that both the leader and another replica 
are not available at the same time. There is no error code to indicate both. 
Third, even if a replica is not available, it's still useful to return its 
replica id since some clients (e.g. admin tool) may still make use of it.

One way to address this issue is to always return the replica id for leader, 
assigned replicas, and isr regardless of whether the corresponding broker is 
live or not. Since we also return the list of live brokers, the client can 
figure out whether a leader or a replica is live or not and act accordingly. 
This way, we don't need to set the partition level error code when the leader 
or a replica is not available. This doesn't change the wire protocol, but does 
change the semantics. So, a new version of the protocol is needed. Since we are 
debating evolving TopicMetadataRequest in KIP-4. We can potentially piggyback 
on that.

{code}
MetadataResponse = [Broker][TopicMetadata]
  Broker = NodeId Host Port  (any number of brokers may be returned)
NodeId = int32
Host = string
Port = int32
  TopicMetadata = TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode = int16
  PartitionMetadata = PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode = int16
PartitionId = int32
Leader = int32
Replicas = [int32]
Isr = [int32]
{code}

 I expect ReplicaNotAvailableException to have proper Javadocs
 -

 Key: KAFKA-2020
 URL: https://issues.apache.org/jira/browse/KAFKA-2020
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Chris Riccomini
Assignee: Neha Narkhede

 It looks like ReplicaNotAvailableException was copy and pasted from 
 LeaderNotAvailable exception. The Javadocs were never changed. This means 
 that users think that ReplicaNotAvailableException signifies leaders are not 
 available. This is very different from, I can ignore this exception, which 
 is what the Kafka protocol docs say to do with ReplicaNotAvailableException.
 Related: what's the point of ReplicaNotAvailableException if it's supposed to 
 be ignored?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
---

(Updated March 16, 2015, 6:32 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
---

PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time 
since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the 
LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Diffs
-

  core/src/main/scala/kafka/cluster/Partition.scala 
c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala 
bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 
06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 
26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
---


Thanks,

Aditya Auradkar



Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
It looks that the problem we want to solve and the purpose we want to
achieve is:
If user uses close() in callback, we want to let user be aware that they
should use close(0) instead of close() in the callback.

We have agreed that we will have an error log to inform user about this
mis-usage. The options differ in the way how we can force user to take a
look at that error log.
There are two scenarios:
1. User does not expect the program to exit.
2. User expect the program to exit.

For scenario 1), blocking will probably delay the discovery of the
problem. Calling close(0) exposes the problem quicker. In this scenario
producer just encounter a send failure when running normally.
For scenario 2), blocking will expose the problem quick. Calling close(-1)
might hide the problem. This scenario might include: a) Unit test for a
send failure. b) Message sending during a close() call from a user thread.

So as a summary table:

  Scenario 1) Scenario 2)

Blocking  Delay problem discovery Guaranteed problem discovery

Close(-1) Immediate problem discovery Problem might be hidden


Personally I prefer blocking because it seems providing more guarantees
and safer.

Thanks.

Jiangjie (Becket) Qin


On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote:

HI Jiangjie,

As far as I understand calling close() in the ioThread is not common, as
it
may only trigger when we saw some non-retriable error. Hence when user run
their program it is unlikely that close() will be triggered and problem
will be detected. So it seems to me that from the error detection aspect
these two options seems to be the same as people will usually detect it
from the producer metrics all dropping to 0.

Guozhang

On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 It seems there are two options we can choose from when close() is called
 from sender thread (callback):
 1. Log an error and close the producer using close(-1)
 2. Log an error and block.
 (Throwing an exception will not work because we catch all the exception
 thrown from user callback. It will just lead to an error log.)

 My concern for the first option is that the producer will be closed even
 if we logged and error. I am wondering if some user would not even take
a
 look at the log if producer is closed normally. Because from the
programs
 behavior, everything looks good. If that is the case, the error message
we
 logged probably will just be ignored until some day when people check
the
 log and see it.

 As for the second option, because producer does not close but blocks.
User
 will notice this the first time they run the program. They probably will
 look at the log to see why producer could not be closed and they will
see
 the error log we put there. So they will get informed about this
mis-usage
 of close() in sender thread the first time they run the code instead of
 some time later.

 Personally I prefer the second one because it is more obvious that
 something was wrong.

 Jiangjie (Becket) Qin

 On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote:

 Yeah I agree we should not silently change the behavior of the function
 with the given parameters; and I would prefer
error-logging-and-shutdown
 over blocking when close(0) is used, since as Neha suggested blocking
 would also not proceed with sending any data, bu will just let users to
 realize the issue later than sooner.
 
 On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io
wrote:
 
  
   And I also agree it is better if we can make producer block when
   close() is called from sender thread so user will notice something
 went
   wrong.
 
 
  This isn't a great experience either. Why can't we just throw an
 exception
  for a behavior we know is incorrect and we'd like the user to know.
  Blocking as a means of doing that seems wrong and annoying.
 
  On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
 
   Cool.
  
   I think blocking is good or alternately throwing an exception
directly
  from
   close(). Basically I would just worry about subtly doing something
  slightly
   different from what the user asked for as it will be hard to notice
 that
   behavior difference.
  
   -Jay
  
   On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
  
Hi Jay,
   
I have modified the KIP as you suggested. I thinks as long as we
 have
consistent define for timeout across Kafka interface, there would
 be no
problem. And I also agree it is better if we can make producer
block
  when
close() is called from sender thread so user will notice
something
 went
wrong.
   
Thanks.
   
Jiangjie (Becket) Qin
   
On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote:
   
Hey Jiangjie,

I think this is going to be very confusing that
  close(0) waits indefinitely and
  close(-1) waits for 0.
I understand this 

Re: Review Request 32061: WIP for KAFKA-2015 plus some minor fixes in new consumer

2015-03-16 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32061/#review76630
---

Ship it!


Ship It!

- Onur Karaman


On March 13, 2015, 10:26 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32061/
 ---
 
 (Updated March 13, 2015, 10:26 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2015
 https://issues.apache.org/jira/browse/KAFKA-2015
 
 
 Repository: kafka
 
 
 Description
 ---
 
 NOTE: without the rebalance implementation a single consumer will try to 
 subscribe to all partitions of the given topic.
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  436f9b2a843bc8c44d17403f5880b6736a5d56a8 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  8b71fbad5c404d3f23137e153d6376de9f82b823 
   config/tools-log4j.properties 52f07c96019b4083fc78f62cfb0a81080327e847 
   core/src/main/scala/kafka/consumer/BaseConsumer.scala PRE-CREATION 
   core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
 910691e88ccc66a1542d0ea85bb2f732861d805e 
   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
 00265f9f4a4b6c6a9aa023e5be5faf297f77bf31 
 
 Diff: https://reviews.apache.org/r/32061/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Guozhang Wang
Thanks Jiangjie,

After talking to you offline on this, I have been convinced and changed my
preference to blocking. The immediate shutdown approach does have some
unsafeness in some cases.

Guozhang

On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 It looks that the problem we want to solve and the purpose we want to
 achieve is:
 If user uses close() in callback, we want to let user be aware that they
 should use close(0) instead of close() in the callback.

 We have agreed that we will have an error log to inform user about this
 mis-usage. The options differ in the way how we can force user to take a
 look at that error log.
 There are two scenarios:
 1. User does not expect the program to exit.
 2. User expect the program to exit.

 For scenario 1), blocking will probably delay the discovery of the
 problem. Calling close(0) exposes the problem quicker. In this scenario
 producer just encounter a send failure when running normally.
 For scenario 2), blocking will expose the problem quick. Calling close(-1)
 might hide the problem. This scenario might include: a) Unit test for a
 send failure. b) Message sending during a close() call from a user thread.

 So as a summary table:

   Scenario 1) Scenario 2)

 Blocking  Delay problem discovery Guaranteed problem discovery

 Close(-1) Immediate problem discovery Problem might be hidden


 Personally I prefer blocking because it seems providing more guarantees
 and safer.

 Thanks.

 Jiangjie (Becket) Qin


 On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote:

 HI Jiangjie,
 
 As far as I understand calling close() in the ioThread is not common, as
 it
 may only trigger when we saw some non-retriable error. Hence when user run
 their program it is unlikely that close() will be triggered and problem
 will be detected. So it seems to me that from the error detection aspect
 these two options seems to be the same as people will usually detect it
 from the producer metrics all dropping to 0.
 
 Guozhang
 
 On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  It seems there are two options we can choose from when close() is called
  from sender thread (callback):
  1. Log an error and close the producer using close(-1)
  2. Log an error and block.
  (Throwing an exception will not work because we catch all the exception
  thrown from user callback. It will just lead to an error log.)
 
  My concern for the first option is that the producer will be closed even
  if we logged and error. I am wondering if some user would not even take
 a
  look at the log if producer is closed normally. Because from the
 programs
  behavior, everything looks good. If that is the case, the error message
 we
  logged probably will just be ignored until some day when people check
 the
  log and see it.
 
  As for the second option, because producer does not close but blocks.
 User
  will notice this the first time they run the program. They probably will
  look at the log to see why producer could not be closed and they will
 see
  the error log we put there. So they will get informed about this
 mis-usage
  of close() in sender thread the first time they run the code instead of
  some time later.
 
  Personally I prefer the second one because it is more obvious that
  something was wrong.
 
  Jiangjie (Becket) Qin
 
  On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote:
 
  Yeah I agree we should not silently change the behavior of the function
  with the given parameters; and I would prefer
 error-logging-and-shutdown
  over blocking when close(0) is used, since as Neha suggested blocking
  would also not proceed with sending any data, bu will just let users to
  realize the issue later than sooner.
  
  On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io
 wrote:
  
   
And I also agree it is better if we can make producer block when
close() is called from sender thread so user will notice something
  went
wrong.
  
  
   This isn't a great experience either. Why can't we just throw an
  exception
   for a behavior we know is incorrect and we'd like the user to know.
   Blocking as a means of doing that seems wrong and annoying.
  
   On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com
  wrote:
  
Cool.
   
I think blocking is good or alternately throwing an exception
 directly
   from
close(). Basically I would just worry about subtly doing something
   slightly
different from what the user asked for as it will be hard to notice
  that
behavior difference.
   
-Jay
   
On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
  j...@linkedin.com.invalid
   
wrote:
   
 Hi Jay,

 I have modified the KIP as you suggested. I thinks as long as we
  have
 consistent define for timeout across Kafka interface, there would
  be no
 problem. And I also agree it is better if 

[jira] [Created] (KAFKA-2022) simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: null exception when the original leader fails instead of being trapped in the fetchResponse a

2015-03-16 Thread Muqeet Mohammed Ali (JIRA)
Muqeet Mohammed Ali created KAFKA-2022:
--

 Summary: simpleconsumer.fetch(req) throws a 
java.nio.channels.ClosedChannelException: null exception when the original 
leader fails instead of being trapped in the fetchResponse api while consuming 
messages
 Key: KAFKA-2022
 URL: https://issues.apache.org/jira/browse/KAFKA-2022
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
 Environment: 3 linux nodes with both zookeepr  brokers running under 
respective users on each..
Reporter: Muqeet Mohammed Ali
Assignee: Neha Narkhede


simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: 
null exception when the original leader fails, instead of being trapped in the 
fetchResponse api while consuming messages. My understanding was that any fetch 
failures can be found via fetchResponse.hasError() call and then be handled to 
fetch new leader in this case. Below is the relevant code snippet from the 
simple consumer with comments marking the line causing exception..can you 
please comment on this?

if (simpleconsumer == null) {
simpleconsumer = new 
SimpleConsumer(leaderAddress.getHostName(), leaderAddress.getPort(), 
consumerTimeout,
consumerBufferSize, 
consumerId);
}

FetchRequest req = new FetchRequestBuilder().clientId(getConsumerId())
.addFetch(topic, partition, 
offsetManager.getTempOffset(), consumerBufferSize)
// Note: the fetchSize might need to be increased
// if large batches are written to Kafka
.build();
// exception is throw at the below line
FetchResponse fetchResponse = simpleconsumer.fetch(req);

if (fetchResponse.hasError()) {
numErrors++;
etc...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)