[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2164:
-
Description: 
If log.logEndOffset < leaderStartOffset the follower resets its offset and 
prints the following:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [topic,11] reset its fetch offset from 49322124 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}
I think the right message should be:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [topic,11] reset its fetch offset from 54369274 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}

This occurs because ReplicaFetcherThread resets the offset and then print log 
message.
Posible solution:
{code}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/
index b31b432..181cbc1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
* Roll out a new log at the follower with the start offset equal to the
*/
   val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar
-  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   warn("Replica %d for partition %s reset its fetch offset from %d to curre
 .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.
+  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   leaderStartOffset
 }
   }
{code}



  was:
If log.logEndOffset < leaderStartOffset the follower resets its offset and 
prints the following:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [topic,11] reset its fetch offset from 49322124 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}
I think the right message should be:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 54369274 
to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset 
offset to 49322124 (kafka.server.ReplicaFetcherThread)
{code}

This occurs because ReplicaFetcherThread resets the offset and then print log 
message.
Posible solution:
{code}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/
index b31b432..181cbc1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
* Roll out a new log at the follower with the start offset equal to the
*/
   val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar
-  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   warn("Replica %d for partition %s reset its fetch offset from %d to curre
 .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.
+  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   leaderStartOffset
 }
   }
{code}




> ReplicaFetcherThread: suspicious log message on reset offset
> 
>
> Key: KAFKA-2164
> URL: https://issues.apache.org/jira/browse/KAFKA-2164
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-2164.patch
>
>
> If log.logEndOffset < leaderStartOffset the follower resets its offset and 
> prints the following:
> {code}
> [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
> partition [topic,11] reset its fetch offset from 49322124 to current leader 
> 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
> [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
> 54369274 for partition [topic,11] out of range;

Re: [DISCUSS] KIP-21 Configuration Management

2015-05-02 Thread Jay Kreps
Hey Aditya,

This is a great! A couple of comments:

1. Leaving the file config in place is definitely the least disturbance.
But let's really think about getting rid of the files and just have one
config mechanism. There is always a tendency to make everything pluggable
which so often just leads to two mediocre solutions. Can we do the exercise
of trying to consider fully getting rid of file config and seeing what goes
wrong?

2. Do we need to model defaults? The current approach is that if you have a
global config x it is overridden for a topic xyz by /topics/xyz/x, and I
think this could be extended to /brokers/0/x. I think this is simpler. We
need to specify the precedence for these overrides, e.g. if you override at
the broker and topic level I think the topic level takes precedence.

3. I recommend we have the producer and consumer config just be an override
under client.id. The override is by client id and we can have separate
properties for controlling quotas for producers and consumers.

4. Some configs can be changed just by updating the reference, others may
require some action. An example of this is if you want to disable log
compaction (assuming we wanted to make that dynamic) we need to call
shutdown() on the cleaner. I think it may be required to register a
listener callback that gets called when the config changes.

5. For handling the reference can you explain your plan a bit? Currently we
have an immutable KafkaConfig object with a bunch of vals. That or
individual values in there get injected all over the code base. I was
thinking something like this:
a. We retain the KafkaConfig object as an immutable object just as today.
b. It is no longer legit to grab values out fo that config if they are
changeable.
c. Instead of making KafkaConfig itself mutable we make KafkaConfiguration
which has a single volatile reference to the current KafkaConfig.
KafkaConfiguration is what gets passed into various components. So to
access a config you do something like config.instance.myValue. When the
config changes the config manager updates this reference.
d. The KafkaConfiguration is the thing that allows doing the
configuration.onChange("my.config", callback)

-Jay

On Tue, Apr 28, 2015 at 3:57 PM, Aditya Auradkar <
aaurad...@linkedin.com.invalid> wrote:

> Hey everyone,
>
> Wrote up a KIP to update topic, client and broker configs dynamically via
> Zookeeper.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration
>
> Please read and provide feedback.
>
> Thanks,
> Aditya
>
> PS: I've intentionally kept this discussion separate from KIP-5 since I'm
> not sure if that is actively being worked on and I wanted to start with a
> clean slate.
>


Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review

2015-05-02 Thread Jay Kreps
+1!

-Jay

On Thu, Apr 30, 2015 at 6:12 AM, Ismael Juma  wrote:

> Hi all,
>
> Kafka currently uses a combination of Review Board and JIRA for
> contributions and code review. In my opinion, this makes contribution and
> code review a bit harder than it has to be.
>
> I think the approach used by Spark would improve the current situation:
>
> "Generally, Spark uses JIRA to track logical issues, including bugs and
> improvements, and uses Github pull requests to manage the review and merge
> of specific code changes. That is, JIRAs are used to describe what should
> be fixed or changed, and high-level approaches, and pull requests describe
> how to implement that change in the project's source code. For example,
> major design decisions are discussed in JIRA."[1]
>
> It's worth reading the wiki page for all the details, but I will summarise
> the suggested workflow for code changes:
>
>1. Fork the Github repository at http://github.com/apache/kafka (if you
>haven't already)
>2. git checkout -b kafka-XXX
>3. Make one or more commits (smaller commits can be easier to review and
>reviewboard makes that hard)
>4. git push origin kafka-XXX
>5. Create PR against upstream/trunk (this will update JIRA
>automatically[2] and it will send an email to the dev mailing list too)
>6. A CI build will be triggered[3]
>7. Review process happens on GitHub (it's quite handy to be able to
>comment on both commit or PR-level, unlike Review Board)
>8. Once all feedback has been addressed and the build is green, a
>variant of the `merge_spark_pr.py`[4] script is used to squash, merge,
>push, close the PR and JIRA issue. The squashed commit generated by the
>script includes a bunch of useful information including links to the
>original commits[5] (in the future, I think it's worth reconsidering the
>squashing of commits, but retaining the information in the commit is
>already an improvement)
>
> Neha merged a couple of commits via GitHub already and it went smoothly
> although we are still missing a few of the pieces described above:
>
>1. CI builds triggered by GitHub PRs (this is supported by Apache Infra,
>we need to request it for Kafka and provide whatever configuration is
>needed)
>2. Adapting Spark's merge_park_pr script and integrating it into the
>kafka Git repository
>3. Updating the Kafka contribution wiki and adding a CONTRIBUTING.md to
>the Git repository (this is shown when someone is creating a pull
> request)
>4. Go through existing GitHub pull requests and close the ones that are
>no longer relevant (there are quite a few as people have been opening
> them
>over the years, but nothing was done about most of them)
>5. Other things I may be missing
>
> I am volunteering to help with the above if people agree that this is the
> right direction for Kafka. Thoughts?
>
> Best.
> Ismael
>
> P.S. I was told in the Apache Infra HipChat that it's not currently
> possible (and there are no plans to change that in the near future) to use
> the GitHub merge button to merge PRs. The merge script does quite a few
> useful things that the merge button does not in any case.
>
> [1]
> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark
> [2]
>
> https://issues.apache.org/jira/browse/KAFKA-1054?focusedCommentId=14513614&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14513614
> [3] https://blogs.apache.org/infra/entry/github_pull_request_builds_now
> [4] https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py
> [5]
>
> https://github.com/apache/spark/commit/59b7cfc41b2c06fbfbf6aca16c1619496a8d1d00
>


[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2165:
-
Status: Patch Available  (was: Open)

> ReplicaFetcherThread: data loss on unknown exception
> 
>
> Key: KAFKA-2165
> URL: https://issues.apache.org/jira/browse/KAFKA-2165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-2165.patch
>
>
> Sometimes in our cluster some replica gets out of the isr. Then broker 
> redownloads the partition from the beginning. We got the following messages 
> in logs:
> {code}
> # The leader:
> [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when 
> processing fetch request for partition [topic,11] offset 54369274 from 
> follower with correlation id 2634499. Possible cause: Request for offset 
> 54369274 but we only have log segments in the range 49322124 to 54369273. 
> (kafka.server.ReplicaManager)
> {code}
> {code}
> # The follower:
> [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for 
> partition [topic,11] reset its fetch offset from 49322124 to current leader 
> 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
> [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 
> 54369274 for partition [topic,11] out of range; reset offset to 49322124 
> (kafka.server.ReplicaFetcherThread)
> {code}
> This occures because we update fetchOffset 
> [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124]
>  and then try to process message. 
> If any exception except OffsetOutOfRangeCode occures we get unsynchronized 
> fetchOffset and replica.logEndOffset.
> On next fetch iteration we can get 
> fetchOffset>replica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2165:
-
Attachment: KAFKA-2165.patch

> ReplicaFetcherThread: data loss on unknown exception
> 
>
> Key: KAFKA-2165
> URL: https://issues.apache.org/jira/browse/KAFKA-2165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-2165.patch
>
>
> Sometimes in our cluster some replica gets out of the isr. Then broker 
> redownloads the partition from the beginning. We got the following messages 
> in logs:
> {code}
> # The leader:
> [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when 
> processing fetch request for partition [topic,11] offset 54369274 from 
> follower with correlation id 2634499. Possible cause: Request for offset 
> 54369274 but we only have log segments in the range 49322124 to 54369273. 
> (kafka.server.ReplicaManager)
> {code}
> {code}
> # The follower:
> [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for 
> partition [topic,11] reset its fetch offset from 49322124 to current leader 
> 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
> [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 
> 54369274 for partition [topic,11] out of range; reset offset to 49322124 
> (kafka.server.ReplicaFetcherThread)
> {code}
> This occures because we update fetchOffset 
> [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124]
>  and then try to process message. 
> If any exception except OffsetOutOfRangeCode occures we get unsynchronized 
> fetchOffset and replica.logEndOffset.
> On next fetch iteration we can get 
> fetchOffset>replica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception

2015-05-02 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-2165:


 Summary: ReplicaFetcherThread: data loss on unknown exception
 Key: KAFKA-2165
 URL: https://issues.apache.org/jira/browse/KAFKA-2165
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy


Sometimes in our cluster some replica gets out of the isr. Then broker 
redownloads the partition from the beginning. We got the following messages in 
logs:
{code}
# The leader:
[2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when 
processing fetch request for partition [topic,11] offset 54369274 from follower 
with correlation id 2634499. Possible cause: Request for offset 54369274 but we 
only have log segments in the range 49322124 to 54369273. 
(kafka.server.ReplicaManager)
{code}

{code}
# The follower:
[2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for 
partition [topic,11] reset its fetch offset from 49322124 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}

This occures because we update fetchOffset 
[here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124]
 and then try to process message. 
If any exception except OffsetOutOfRangeCode occures we get unsynchronized 
fetchOffset and replica.logEndOffset.
On next fetch iteration we can get 
fetchOffset>replica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2164:
-
Status: Patch Available  (was: Open)

> ReplicaFetcherThread: suspicious log message on reset offset
> 
>
> Key: KAFKA-2164
> URL: https://issues.apache.org/jira/browse/KAFKA-2164
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-2164.patch
>
>
> If log.logEndOffset < leaderStartOffset the follower resets its offset and 
> prints the following:
> {code}
> [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
> partition [topic,11] reset its fetch offset from 49322124 to current leader 
> 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
> [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
> 54369274 for partition [topic,11] out of range; reset offset to 49322124 
> (kafka.server.ReplicaFetcherThread)
> {code}
> I think the right message should be:
> {code}
> [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
> partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 
> 54369274 to current leader 21's start offset 49322124 
> (kafka.server.ReplicaFetcherThread)
> [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
> 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset 
> offset to 49322124 (kafka.server.ReplicaFetcherThread)
> {code}
> This occurs because ReplicaFetcherThread resets the offset and then print log 
> message.
> Posible solution:
> {code}
> diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
> b/core/
> index b31b432..181cbc1 100644
> --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
> +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
> @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
> * Roll out a new log at the follower with the start offset equal to 
> the
> */
>val leaderStartOffset = 
> simpleConsumer.earliestOrLatestOffset(topicAndPar
> -  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, 
> leaderSt
>warn("Replica %d for partition %s reset its fetch offset from %d to 
> curre
>  .format(brokerConfig.brokerId, topicAndPartition, 
> replica.logEndOffset.
> +  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, 
> leaderSt
>leaderStartOffset
>  }
>}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset

2015-05-02 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2164:
-
Attachment: KAFKA-2164.patch

> ReplicaFetcherThread: suspicious log message on reset offset
> 
>
> Key: KAFKA-2164
> URL: https://issues.apache.org/jira/browse/KAFKA-2164
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Alexey Ozeritskiy
> Attachments: KAFKA-2164.patch
>
>
> If log.logEndOffset < leaderStartOffset the follower resets its offset and 
> prints the following:
> {code}
> [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
> partition [topic,11] reset its fetch offset from 49322124 to current leader 
> 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
> [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
> 54369274 for partition [topic,11] out of range; reset offset to 49322124 
> (kafka.server.ReplicaFetcherThread)
> {code}
> I think the right message should be:
> {code}
> [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
> partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 
> 54369274 to current leader 21's start offset 49322124 
> (kafka.server.ReplicaFetcherThread)
> [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
> 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset 
> offset to 49322124 (kafka.server.ReplicaFetcherThread)
> {code}
> This occurs because ReplicaFetcherThread resets the offset and then print log 
> message.
> Posible solution:
> {code}
> diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
> b/core/
> index b31b432..181cbc1 100644
> --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
> +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
> @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
> * Roll out a new log at the follower with the start offset equal to 
> the
> */
>val leaderStartOffset = 
> simpleConsumer.earliestOrLatestOffset(topicAndPar
> -  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, 
> leaderSt
>warn("Replica %d for partition %s reset its fetch offset from %d to 
> curre
>  .format(brokerConfig.brokerId, topicAndPartition, 
> replica.logEndOffset.
> +  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, 
> leaderSt
>leaderStartOffset
>  }
>}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review

2015-05-02 Thread Jun Rao
Ismael,

We will also need to figure out if we need CONTRIBUTING.md like the
following to take care of the Apache licensing stuff.

https://github.com/apache/spark/blob/master/CONTRIBUTING.md

As for merging changes, I think squashing the commits will be ideal during
merge. Also, it would be great if we can always put the latest changes on
top during the merge.

Thanks,

Jun

On Thu, Apr 30, 2015 at 8:12 AM, Ismael Juma  wrote:

> Hi all,
>
> Kafka currently uses a combination of Review Board and JIRA for
> contributions and code review. In my opinion, this makes contribution and
> code review a bit harder than it has to be.
>
> I think the approach used by Spark would improve the current situation:
>
> "Generally, Spark uses JIRA to track logical issues, including bugs and
> improvements, and uses Github pull requests to manage the review and merge
> of specific code changes. That is, JIRAs are used to describe what should
> be fixed or changed, and high-level approaches, and pull requests describe
> how to implement that change in the project's source code. For example,
> major design decisions are discussed in JIRA."[1]
>
> It's worth reading the wiki page for all the details, but I will summarise
> the suggested workflow for code changes:
>
>1. Fork the Github repository at http://github.com/apache/kafka (if you
>haven't already)
>2. git checkout -b kafka-XXX
>3. Make one or more commits (smaller commits can be easier to review and
>reviewboard makes that hard)
>4. git push origin kafka-XXX
>5. Create PR against upstream/trunk (this will update JIRA
>automatically[2] and it will send an email to the dev mailing list too)
>6. A CI build will be triggered[3]
>7. Review process happens on GitHub (it's quite handy to be able to
>comment on both commit or PR-level, unlike Review Board)
>8. Once all feedback has been addressed and the build is green, a
>variant of the `merge_spark_pr.py`[4] script is used to squash, merge,
>push, close the PR and JIRA issue. The squashed commit generated by the
>script includes a bunch of useful information including links to the
>original commits[5] (in the future, I think it's worth reconsidering the
>squashing of commits, but retaining the information in the commit is
>already an improvement)
>
> Neha merged a couple of commits via GitHub already and it went smoothly
> although we are still missing a few of the pieces described above:
>
>1. CI builds triggered by GitHub PRs (this is supported by Apache Infra,
>we need to request it for Kafka and provide whatever configuration is
>needed)
>2. Adapting Spark's merge_park_pr script and integrating it into the
>kafka Git repository
>3. Updating the Kafka contribution wiki and adding a CONTRIBUTING.md to
>the Git repository (this is shown when someone is creating a pull
> request)
>4. Go through existing GitHub pull requests and close the ones that are
>no longer relevant (there are quite a few as people have been opening
> them
>over the years, but nothing was done about most of them)
>5. Other things I may be missing
>
> I am volunteering to help with the above if people agree that this is the
> right direction for Kafka. Thoughts?
>
> Best.
> Ismael
>
> P.S. I was told in the Apache Infra HipChat that it's not currently
> possible (and there are no plans to change that in the near future) to use
> the GitHub merge button to merge PRs. The merge script does quite a few
> useful things that the merge button does not in any case.
>
> [1]
> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark
> [2]
>
> https://issues.apache.org/jira/browse/KAFKA-1054?focusedCommentId=14513614&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14513614
> [3] https://blogs.apache.org/infra/entry/github_pull_request_builds_now
> [4] https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py
> [5]
>
> https://github.com/apache/spark/commit/59b7cfc41b2c06fbfbf6aca16c1619496a8d1d00
>


[jira] [Created] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset

2015-05-02 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-2164:


 Summary: ReplicaFetcherThread: suspicious log message on reset 
offset
 Key: KAFKA-2164
 URL: https://issues.apache.org/jira/browse/KAFKA-2164
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy


If log.logEndOffset < leaderStartOffset the follower resets its offset and 
prints the following:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [topic,11] reset its fetch offset from 49322124 to current leader 
21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [topic,11] out of range; reset offset to 49322124 
(kafka.server.ReplicaFetcherThread)
{code}
I think the right message should be:
{code}
[2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for 
partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 54369274 
to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
[2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 
54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset 
offset to 49322124 (kafka.server.ReplicaFetcherThread)
{code}

This occurs because ReplicaFetcherThread resets the offset and then print log 
message.
Posible solution:
{code}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/
index b31b432..181cbc1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
* Roll out a new log at the follower with the start offset equal to the
*/
   val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar
-  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   warn("Replica %d for partition %s reset its fetch offset from %d to curre
 .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.
+  replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt
   leaderStartOffset
 }
   }
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2015-05-02 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525421#comment-14525421
 ] 

Jiangjie Qin commented on KAFKA-1788:
-

I took a shot to incorporate the solution to this problem in KAFKA-2142.
The approach I took there is to just use metadata timeout instead of add a new 
timeout. Because I think this is essentially metadata not available. So we 
should treat it the same as in send(). This also saves us another timeout 
configuration.
[~ewencp] My concern about having cap on the buffer for each topic-partition is 
that what if the traffic of each topic-partition is not balanced. If so we 
might end up waiting on a busy topic-partition's buffer allocation while we 
actually have plenty of memory to use. That could hurt the performance a lot.

> producer record can stay in RecordAccumulator forever if leader is no 
> available
> ---
>
> Key: KAFKA-1788
> URL: https://issues.apache.org/jira/browse/KAFKA-1788
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 0.8.2.0
>Reporter: Jun Rao
>Assignee: Parth Brahmbhatt
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, 
> KAFKA-1788_2015-01-06_13:44:41.patch
>
>
> In the new producer, when a partition has no leader for a long time (e.g., 
> all replicas are down), the records for that partition will stay in the 
> RecordAccumulator until the leader is available. This may cause the 
> bufferpool to be full and the callback for the produced message to block for 
> a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerGroupCommand

2015-05-02 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525420#comment-14525420
 ] 

Ashish K Singh commented on KAFKA-313:
--

[~nehanarkhede] and [~gwenshap], pinging again for review.

> Add JSON/CSV output and looping options to ConsumerGroupCommand
> ---
>
> Key: KAFKA-313
> URL: https://issues.apache.org/jira/browse/KAFKA-313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dave DeMaagd
>Assignee: Ashish K Singh
>Priority: Minor
>  Labels: newbie, patch
> Fix For: 0.8.3
>
> Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
> KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch
>
>
> Adds:
> * '--loop N' - causes the program to loop forever, sleeping for up to N 
> seconds between loops (loop time minus collection time, unless that's less 
> than 0, at which point it will just run again immediately)
> * '--asjson' - display as a JSON string instead of the more human readable 
> output format.
> Neither of the above  depend on each other (you can loop in the human 
> readable output, or do a single shot execution with JSON output).  Existing 
> behavior/output maintained if neither of the above are used.  Diff Attached.
> Impacted files:
> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Perf testing flush() - issues found

2015-05-02 Thread Roshan Naik
Thanks @Jay for suggesting changes to batch.size and linger.ms.  I tried
them out. It appears one can do better than the default batch.size for
this synchronous batch mode with flush().

These new measurements are giving more "rational" numbers which with I can
reason and infer some thumb rules (for batch-sync mode using flush).


Here are my observations:
   - The new producer API does much better than the older one for *single
threaded* producer. (best# i saw with old is ~68MB/s, with new ~140MB/s)
   - Higher linger.ms sometimes helps perf and at other times hurts. No
simple rule here. Best to try it out and decide whether default is good
for your case or not.
   - For single threaded producer: To get the most throughput, set
batch.size = (total bytes between flushes / partition count).
   - Running more single threaded producer processes helped (till about
till 3 / 4 processes)
   - 1-producer going to single partition is faster than 1 producer going
to multiple partitions
   - The number of bytes between two explicit flushes (ie. flush interval)
made much smaller impact than the buffer.size. Something to be learnt
here.. my speculation is that with smaller flush intervals this might
change. Having two knobs (batch.size & flush interval is a a bit confusing
for end users trying to tune it, will be good if we can find if there is
some simple guidance feasible)
- Other than some inconveniences previously mentioned, I feel flush()
could be used as a way to simulate sync-batch behavior.

Producer Limits:
   - Able to exceed 1gigEthernet capacity, but not 10gigEthernet. Does not
appear to go beyond ~460MB/s. Verified my test machines are able to
achieve 1GB/s.

Todo:
- Need to try Multi threaded producer.
- I did some testing of the Consumer APIs as well with 0.8.1 consumer-perf
tool. Wasnt able to push it beyond  30MB/s. When producers ran in parallel
it fell to under 10MB/s. Need to dig deeper. Will report back. Suggestions
welcome.



Measurements:

 - See attachment 
 - Also available on paste bin:  http://pastebin.com/p3kSAjy6



Settings: acks=1, single broker, single threaded producer (new api)
Machines: 32 cores, 256GB RAM, 10 gigE, 6x15000 rpm disks


1 partition 
   FlushInt=4MBFlushInt=8MB
FlushInt=16MB   
linger=def  batch.size = default 57  54   52  
linger=1s   batch.size = default 57  61   59  

linger=def  batch.size= flushInt/parts  136 125  116 
linger=1s   batch.size= flushInt/parts   92  77   56  

linger=def  batch.size == flushInt  140 123  124 
linger=def  batch.size = 10MB   140 123  124 
linger=def  batch.Size = 20MB31  30   42  


4 partitions
FlushInt=4MBFlushInt=8MB
FlushInt=16MB
linger=def  batch.size = default95   82   80  
linger=1s   batch.size = default85   83   85  

linger=def  batch.size= batch/#part 127 133   90  
linger=1s   batch.size= batch/#part 94  100  101 

linger=def  batch.size == flushInt  6086   
linger=def  batch.size = 10M7 77   
linger=def  batch.Size = 20M6 65   


8 partitions
FlushInt=4MBFlushInt=8MB
FlushInt=16MB
linger=def  batch.size = default100  89   96
linger=1s   batch.size = default105  97   98  

linger=def  batch.size= batch/#part 114 128   78  
linger=1s   batch.size= batch/#part  95  94  102 

linger=def  batch.size == flushInt7   88   
linger=def  batch.size = 10M  7   87   
linger=def  batch.Size = 20M  6   66   


With multiple procduers (each single threaded)


For 1 partition :
1 process = 136 MB/s
3 process = 344 MB/s
4 process = 290 MB/s


For 4 partition ():
1 process = 127 MB/s
3 process = 345 MB/s
4 process = 372 MB/s


For 8 partition ():
1 process = 128 MB/s
3 process = 304 MB/s
4 process = 460 MB/s








[jira] [Commented] (KAFKA-2142) Follow-up patch for KAFKA-2138 Refactor the drain message logic in new producer

2015-05-02 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525408#comment-14525408
 ] 

Jiangjie Qin commented on KAFKA-2142:
-

Updated reviewboard https://reviews.apache.org/r/33552/diff/
 against branch origin/trunk

> Follow-up patch for KAFKA-2138 Refactor the drain message logic in new 
> producer
> ---
>
> Key: KAFKA-2142
> URL: https://issues.apache.org/jira/browse/KAFKA-2142
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2142.patch, KAFKA-2142_2015-04-25_11:48:09.patch, 
> KAFKA-2142_2015-05-02_11:59:39.patch
>
>
> This is the follow up patch for KAFKA-2138. Currently the logic for sender to 
> drain message from accumulator is a little bit awkward, we want to refactor 
> it a little bit. Copy/Paste Guozhang's suggestion below:
> {quote}
> 1. while handle metadata response and update the metadata, check for ANY 
> partitions if their leader is not known; if there is set 
> metadata.requestUpdate. So we do not need to do this step anymore at the 
> start of run().
> 2. get all the ready nodes based on their connection state only (i.e. no 
> peeking in RecordAccumulator), and record the node_backoff as min 
> (reconnection_backoff - time_waited) of all nodes; if one of these node is 
> connected or connecting, this backoff should be 0.
> 3. for each of ready nodes, try to drain their corresponding partitions 
> in RecordAccumulator while considering or kinds of conditions (full, expired, 
> exhausted, etc...), and record the data_backoff as min (retry_backoff - 
> time_waited) of all partitions; if one of the partitions is immediately 
> sendable, this backoff should be 0.
> 4. formulate produce request and call client.poll() with timeout = 
> reconnection_backoff > 0 ? recconection_backoff : retry_backoff.
> 5. in NetworkClient.poll(), the logic of "maybeUpdateMetadata" while 
> update metadataTimeout can also be simplified.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2142) Follow-up patch for KAFKA-2138 Refactor the drain message logic in new producer

2015-05-02 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2142:

Attachment: KAFKA-2142_2015-05-02_11:59:39.patch

> Follow-up patch for KAFKA-2138 Refactor the drain message logic in new 
> producer
> ---
>
> Key: KAFKA-2142
> URL: https://issues.apache.org/jira/browse/KAFKA-2142
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2142.patch, KAFKA-2142_2015-04-25_11:48:09.patch, 
> KAFKA-2142_2015-05-02_11:59:39.patch
>
>
> This is the follow up patch for KAFKA-2138. Currently the logic for sender to 
> drain message from accumulator is a little bit awkward, we want to refactor 
> it a little bit. Copy/Paste Guozhang's suggestion below:
> {quote}
> 1. while handle metadata response and update the metadata, check for ANY 
> partitions if their leader is not known; if there is set 
> metadata.requestUpdate. So we do not need to do this step anymore at the 
> start of run().
> 2. get all the ready nodes based on their connection state only (i.e. no 
> peeking in RecordAccumulator), and record the node_backoff as min 
> (reconnection_backoff - time_waited) of all nodes; if one of these node is 
> connected or connecting, this backoff should be 0.
> 3. for each of ready nodes, try to drain their corresponding partitions 
> in RecordAccumulator while considering or kinds of conditions (full, expired, 
> exhausted, etc...), and record the data_backoff as min (retry_backoff - 
> time_waited) of all partitions; if one of the partitions is immediately 
> sendable, this backoff should be 0.
> 4. formulate produce request and call client.poll() with timeout = 
> reconnection_backoff > 0 ? recconection_backoff : retry_backoff.
> 5. in NetworkClient.poll(), the logic of "maybeUpdateMetadata" while 
> update metadataTimeout can also be simplified.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33552: Patch for KAFKA-2142

2015-05-02 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33552/
---

(Updated May 2, 2015, 6:59 p.m.)


Review request for kafka.


Bugs: KAFKA-2142
https://issues.apache.org/jira/browse/KAFKA-2142


Repository: kafka


Description (updated)
---

Patch for KAFKA-2142


Patch for KAFAK-2142


Minor change in comments


Fix Null pointer


Add fix to KAFKA-1788


Rebase on trunk


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
07f1cdb1fe920b0c7a5f2d101ddc40c689e1b247 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
42b12928781463b56fc4a45d96bb4da2745b6d95 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 49a98838767615dd952da20825f6985698137710 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 
928087d29deb80655ca83726c1ebc45d76468c1f 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 baa48e7c1b7ac5da8f3aca29f653c3fff88f8009 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
  clients/src/test/resources/log4j.properties 
b1d5b7f2b4091040bdcfb0a60fd5879f45a0 
  core/src/test/resources/log4j.properties 
1b7d5d8f7d5fae7d272849715714781cad05d77b 

Diff: https://reviews.apache.org/r/33552/diff/


Testing
---

Unit Test passed.


Thanks,

Jiangjie Qin



[test] - please ignore [eom]

2015-05-02 Thread Gwen Shapira