Question about manual tracking of Offset

2014-03-24 Thread Krishna Raj
Hi experts & Kafka Dev team,

Have a very quick question and need your help in designing a consumer. I am
trying to keep the email short and simple.

Scenario, Lets say:

1) Have a Kafka with 20 messages(from Offset 0 - 19)

2) Pulling the 1st 10 message from 0th to 9th Offset(assuming the desired
size pulls 1st 10 message)

3) Manually commit the Last offset - which is Offset 9 using OffsetCommit
API

Now, when I want to going into the next iteration to pull the next batch of
messages(in this case from 10th to 19th Offsets), I will do a Current
Offset Fetch.

In this case, the offset fetch will give me 9th.

Since I have already processed the message @ 9th Offset, I will end up
processing it again.

Ideally, I want to get 10 as Offset when I want to start the next iteration
of starting the message processing.

I navigated through the latest Kafka documents, but I dont see any method
to find out the Next Offset from Kafka rather than the current manually
committed offset.

*Simply put: I processed "Nth" Offset and committed "N". I want the NEXT
OFFSET FROM the Nth OFFSET. I dont want to assume it as (N+1).*

*Or is it correct if I assume its N+1 ?*

Thanks for your time !
 Krishna Raj


Question about manual tracking of Offset

2014-03-24 Thread Krishna Raj
Hi experts & Kafka Dev team,

Have a very quick question and need your help in designing a consumer. I am
trying to keep the email short and simple.

Scenario, Lets say:

1) Have a Kafka with 20 messages(from Offset 0 - 19)

2) Pulling the 1st 10 message from 0th to 9th Offset(assuming the desired
size pulls 1st 10 message)

3) Manually commit the Last offset - which is Offset 9 using OffsetCommit
API


Now, when I want to going into the next iteration to pull the next batch of
messages(in this case from 10th to 19th Offsets), I will do a Current
Offset Fetch.

In this case, the offset fetch will give me 9th.

Since I have already processed the message @ 9th Offset, I will end up
processing it again.

Ideally, I want to get 10 as Offset when I want to start the next iteration
of starting the message processing.

I navigated through the latest Kafka documents, but I dont see any method
to find out the Next Offset from Kafka rather than the current manually
committed offset.

*Simply put: I processed "Nth" Offset and committed "N". I want the NEXT
OFFSET FROM the Nth OFFSET. I dont want to assume it as (N+1).*

*Or is it correct if I assume its N+1 ?*


Thanks for your time !
Krishna Raj


[jira] [Commented] (KAFKA-1029) Zookeeper leader election stuck in ephemeral node retry loop

2014-03-24 Thread Sam Meder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13946076#comment-13946076
 ] 

Sam Meder commented on KAFKA-1029:
--

We've seen this as well (once or twice in ~4 months). The exact conditions 
under which it occurs have been pretty hard to pin-point.

> Zookeeper leader election stuck in ephemeral node retry loop
> 
>
> Key: KAFKA-1029
> URL: https://issues.apache.org/jira/browse/KAFKA-1029
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Sam Meder
>Priority: Blocker
> Fix For: 0.8.0
>
> Attachments: 
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, 
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, 
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ 
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a 
> while back in a different session, hence I will backoff for this node to be 
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with 
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
> controllerContext.zkClient.subscribeDataChanges(electionPath, 
> leaderChangeListener)
> val timestamp = SystemTime.milliseconds.toString
> val electString = ...
> try {
>   
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, leaderId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it 
> looks like a new registration will be added every elect call - shouldn't it 
> register in startup()?) so can update leaderId to the current leader before 
> the call to create. If that happens then we will continuously get node exists 
> exceptions and the checker function will always return true, i.e. we will 
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when 
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, brokerId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the 
> broker that owned the node previously, although I am still not 100% sure if 
> that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 19388: Patch for KAFKA-1251

2014-03-24 Thread Jun Rao


> On March 23, 2014, 11:57 p.m., Jun Rao wrote:
> > Thanks for the patch. Some comments.
> > 
> > 1. For producer level stats (ie., not node or topic level), for jmx beans, 
> > the package name becomes kafka.producer (since the bean name is 
> > kafka.producer:type=client-id). For the topic and node level stats, jmx 
> > beans have the package name kafka.producer.client-id. Perhaps we can name 
> > the producer level stats under kafka.producer.client-id.aggregate?
> > 
> > 2. Could we make the metrics.timeWindowNs and metrics.samples configurable?
> > 
> > 3. Also, in Metrics, what's the purpose of eventWindow? It's not being used 
> > and I am not sure if there is a case when we will roll the window by count, 
> > instead of size.
> > 
> > 4. When testing using console producer, I sent 1 message and expected the 
> > topic level message rate to be non-zero for at least 30 secs (on average, I 
> > expect the non-zero value to last 45 secs). However, in multiple tests, the 
> > number drops to 0 well before 30 secs.
> > 
> > 5. Metric: Our measurement returns a double. However, we don't synchronize 
> > btw the reader and the writer of the double. Since double access is not 
> > guaranteed to be atomic in java, we could see incorrect value in the output.
> > 
> > 6. Not sure why we need to change the license header. Perhaps we can have a 
> > separate jira to standardize all headers if there is inconsistency?
> 
> Jay Kreps wrote:
> 1. Yes, that makes sense.
> 2. Yes, will do.
> 3. The windows are defined by a time AND event count whichever is 
> triggered first. I am not currently using this but it allows for more 
> responsive metrics updates for metrics where data is plentiful. That is 1000 
> measurements should be enough to get a reasonably low variance measurement so 
> you could make your updates more repsonsive by setting the window to be 1000 
> or 30 seconds, so that frequent metrics would potentially shrink the window 
> to just a few seconds.
> 4. There was a bug that Guozhang caught which I think was causing this. 
> Will be fixed in the next patch.
> 5. Access to the metric value is synchronized by the sensor in 
> KafkaMetric.
> 6. The licenses weren't at the default comment length. I can file a patch 
> to change them all.
> 
> Jun Rao wrote:
> 5. The synchronization in KafkaMetric only handles the reads of the 
> metric. The updates don't seem to be synchronized on the same lock, right?
> 
> 7. Also, noticed that when calculating a reading, we used the elapsed 
> time since the oldest update time, which may not align on the window 
> boundary. Say we have a window of 30 secs and an update comes in at 20 secs. 
> When calculating a rate, we will only account for 10 secs in that window. 
> However, actually only 1 event shows up in the whole 30 secs window.
> 
> Jay Kreps wrote:
> 5. The sensor's lock is used for the metrics the sensor owns so all reads 
> and writes go through that. The sensors acquires and releases the lock for 
> all metric updates and the metric acquires it for reads.
> 7. You raise a good point, though I'm not sure if your description is 
> right either. Currently I compute the elapsed time across all samples first. 
> However if one of the samples is very old (e.g. in the case where there have 
> been no updates) this elapsed time will be very long. However when I compute 
> the value across samples I will purge the old samples. This is backwards, I 
> need to first compute the value and do the purge, then compute the elapsed 
> time off the remaining samples so that these two numbers are comparable.

5. Got it. It wasn't obvious to me that KafkaMetrics.lock is actually the 
sensor lock. Perhaps we could rename it to sensorLock?

7. Yes, that's another issue. My original question is a bit different. Say we 
keep one sample with a window size of 30 secs. At time 20 sec an event is 
recorded. We will use 1/(now - 20) to calculate the rate. However, there wasn't 
any update from 0 to 20. So, may be the rate should be 1/(now - 0)?


> On March 23, 2014, 11:57 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java,
> >  lines 795-796
> > 
> >
> > Perhaps we could distinguish btw metadata request and produce request?
> 
> Jay Kreps wrote:
> Do you think this is useful? We do have this on the server-side, no?

I think at least for request rate, it's probably useful to see the producer 
request and metadata request separately at the client?


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19388/#review38261
---


On March 20, 2014, 12:30 a.m., Jay Kreps wrote:
> 
> ---
> This is an auto

[jira] [Updated] (KAFKA-1323) log.dirs server property no longer supports relative directories

2014-03-24 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1323:
-

Priority: Blocker  (was: Major)

> log.dirs server property no longer supports relative directories
> 
>
> Key: KAFKA-1323
> URL: https://issues.apache.org/jira/browse/KAFKA-1323
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Priority: Blocker
> Fix For: 0.8.1.1
>
>
> This seems to have been caused by KAFKA-1315 - we now don't support relative 
> directories.
> Steps to reproduce:
> * Set a relative directory for log.dirs. E.g., {{log.dirs=data/kafka-logs}}
> * Bring up the broker and produce some messages: 
> {{./bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --messages 
> 1000 --topic test}}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


0.8.1.1 - rough ETA?

2014-03-24 Thread Otis Gospodnetic
Hi,

There was talk about 0.8.1.1 bugfix release about 10 days ago.  Are there
any rough ETAs for it? e.g. are you thinking next week or end of April?

Sorry about the when question, just trying to plan our halted migration to
Kafka 0.8.x

Thanks,
Otis
--
Performance Monitoring * Log Analytics * Search Analytics
Solr & Elasticsearch Support * http://sematext.com/


Re: New Consumer API discussion

2014-03-24 Thread Neha Narkhede
I took some time to write some example code using the new consumer APIs to
cover a range of use cases. This exercise was very useful (thanks for the
suggestion, Jay!) since I found several improvements to the APIs to make
them more usable. Here are some of the
changesI
made -

1. Added usage examples to the KafkaConsumer
javadoc.
I find it useful for the examples to be in the javadoc vs some wiki. Please
go through these examples and suggest improvements. The goal would be to
document a limited set of examples that cover every major use case.
2. All APIs that either accept or return offsets are changed to
Map instead of TopicPartitionOffset... In all the
examples that I wrote, it was much easier to deal with offsets and pass
them around in the consumer APIs if they were maps instead of lists
3. Due to the above change, I had to introduce
commit()and
commitAsync() APIs explicitly, in addition to
commit(Map offsets) and
commitAsync(Map offsets), since the no-argument case
would not be covered automatically with Map as the input parameter to the
commit APIs
4. Offset rewind logic is funky with group management. I took a stab and it
and wrote examples to cover the various offset rewind uses cases I could
think of. I'm not so sure I like it, so I encourage people to take a look
at the examples and provide feedback. This feedback is very critical in
finalizing the consumer APIs as we might have to add/change APIs to make
offset rewind intuitive and easy to use. (Please see the 3rd and 4th
examples 
here
)

Once I have feedback on the above, I will go ahead and submit a review
board for the new APIs and javadoc.

Thanks
Neha


On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede wrote:

> Hey Chris,
>
> Really sorry for the late reply, wonder how this fell through the cracks.
> Anyhow, thanks for the great feedback! Here are my comments -
>
>
> 1. Why is the config String->Object instead of String->String?
>
> This is probably more of a feedback about the new config management that
> we adopted in the new clients. I think it is more convenient to write
> configs.put("a", 42);
> instead of
> configs.put("a", Integer.toString(42));
>
> 2. Are these Java docs correct?
>
>   KafkaConsumer(java.util.Map<
> java.lang.String,java.lang.Object> configs)
>   A consumer is instantiated by providing a set of key-value pairs as
> configuration and a ConsumerRebalanceCallback implementation
>
> There is no ConsumerRebalanceCallback parameter.
>
> Fixed.
>
>
> 3. Would like to have a method:
>
>   poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
> TopicPartition... topicAndPartitionsToPoll)
>
> I see I can effectively do this by just fiddling with subscribe and
> unsubscribe before each poll. Is this a low-overhead operation? Can I just
> unsubscribe from everything after each poll, then re-subscribe to a topic
> the next iteration. I would probably be doing this in a fairly tight loop.
>
> The subscribe and unsubscribe will be very lightweight in-memory
> operations,
> so it shouldn't be a problem to just use those APIs directly.
> Let me know if you think otherwise.
>
> 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
> are use cases for decoupling "what to do when no offset exists" from "what
> to do when I'm out of range". I might want to start from smallest the
> first time I run, but fail if I ever get offset out of range.
>
> How about adding a third option "disable" to "auto.offset.reset"?
> What this says is that never automatically reset the offset, either if one
> is not found or if the offset
> falls out of range. Presumably, you would want to turn this off when you
> want to control the offsets
> yourself and use custom rewind/replay logic to reset the consumer's
> offset. In this case, you would
> want to turn this feature off so Kafka does not accidentally reset the
> offset to something else.
>
> I'm not so sure when you would want to make the distinction regarding
> startup and offset falling out
> of range. Presumably, if you don't trust Kafka to reset the offset, then
> you can always turn this off
> and use commit/commitAsync and seek() to set the consumer to the right
> offset on startup and every
> time your consumer falls out of range.
>
> Does that make sense?
>
> 5. ENABLE_JMX could use Java docs, even though it's fairly
> self-explanatory.
>
> Fixed.
>
> 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
> across all topic/partitions is useful. I believe it's per-topic/partition,
> right? That is, setting to 2 megs with two TopicAndPartit

Re: New Consumer API discussion

2014-03-24 Thread Neha Narkhede
Hey Chris,

Really sorry for the late reply, wonder how this fell through the cracks.
Anyhow, thanks for the great feedback! Here are my comments -

1. Why is the config String->Object instead of String->String?

This is probably more of a feedback about the new config management that
we adopted in the new clients. I think it is more convenient to write
configs.put("a", 42);
instead of
configs.put("a", Integer.toString(42));

2. Are these Java docs correct?

  KafkaConsumer(java.util.Map<
java.lang.String,java.lang.Object> configs)
  A consumer is instantiated by providing a set of key-value pairs as
configuration and a ConsumerRebalanceCallback implementation

There is no ConsumerRebalanceCallback parameter.

Fixed.

3. Would like to have a method:

  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
TopicPartition... topicAndPartitionsToPoll)

I see I can effectively do this by just fiddling with subscribe and
unsubscribe before each poll. Is this a low-overhead operation? Can I just
unsubscribe from everything after each poll, then re-subscribe to a topic
the next iteration. I would probably be doing this in a fairly tight loop.

The subscribe and unsubscribe will be very lightweight in-memory operations,
so it shouldn't be a problem to just use those APIs directly.
Let me know if you think otherwise.

4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
are use cases for decoupling "what to do when no offset exists" from "what
to do when I'm out of range". I might want to start from smallest the
first time I run, but fail if I ever get offset out of range.

How about adding a third option "disable" to "auto.offset.reset"?
What this says is that never automatically reset the offset, either if one
is not found or if the offset
falls out of range. Presumably, you would want to turn this off when you
want to control the offsets
yourself and use custom rewind/replay logic to reset the consumer's offset.
In this case, you would
want to turn this feature off so Kafka does not accidentally reset the
offset to something else.

I'm not so sure when you would want to make the distinction regarding
startup and offset falling out
of range. Presumably, if you don't trust Kafka to reset the offset, then
you can always turn this off
and use commit/commitAsync and seek() to set the consumer to the right
offset on startup and every
time your consumer falls out of range.

Does that make sense?

5. ENABLE_JMX could use Java docs, even though it's fairly
self-explanatory.

Fixed.

6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
across all topic/partitions is useful. I believe it's per-topic/partition,
right? That is, setting to 2 megs with two TopicAndPartitions would result
in 4 megs worth of data coming in per fetch, right?

Good point, clarified that. Take a look again to see if it makes sense now.

7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
Retry, or throw exception?

Throw a TimeoutException. Clarified that in the
docs
.

8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
fetch requests?

Applies to all requests. Clarified that in the docs.

9. What does SESSION_TIMEOUT_MS default to?

Defaults are largely TODO, but session.timeout.ms currently defaults to
1000.

10. Is this consumer thread-safe?

It should be. Updated the
docsto
clarify that.

11. How do you use a different offset management strategy? Your email
implies that it's pluggable, but I don't see how. "The offset management
strategy defaults to Kafka based offset management and the API provides a
way for the user to use a customized offset store to manage the consumer's
offsets."

12. If I wish to decouple the consumer from the offset checkpointing, is
it OK to use Joel's offset management stuff directly, rather than through
the consumer's commit API?

For #11 and #12, I updated the
docsto
include actual usage examples.
Could you take a look and see if answers your questions?

Thanks,
Neha



On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini wrote:

> Hey Guys,
>
> Also, for reference, we'll be looking to implement new Samza consumers
> which have these APIs:
>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
> g/apache/samza/system/SystemConsumer.html
>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
> g/apache/samza/checkpoint/CheckpointManager.html
>
>
> Question (3) below is a result of having Samza's SystemConsumers poll
> allow specific topic/partitions to be specified.
>
> The split between consumer and checkpoint manager is the reason for
> question (12) be

Re: Review Request 18299: Fix KAFKA-1253

2014-03-24 Thread Guozhang Wang


> On March 24, 2014, 11:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
> >  lines 246-251
> > 
> >
> > We can't just drop those records on the floor since we need to inform 
> > the client about the failure. Perhaps we could just form a request with a 
> > single RecordBatch and send it. The request will fail and the client will 
> > be notified.

I agree, and in the long run we probably should back-off the batch size.

I think that before we eventually fix this issue, currently we would probably 
just log it when this issue happens and move on. Throwing an runtime exception 
is generally doing the same as it does not necessarily stop the producer, but 
throwing it has the side-effect of undefined behavior of this sender iteration. 
So I decided to just log it and let the producer proceed. Let me know if you 
have any preferences.


> On March 24, 2014, 11:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines 
> > 187-188
> > 
> >
> > So, we are switching to a static estimated compression rate? Will that 
> > be effective in terms of reducing re-allocation rate?

We still update the compression rate in close().


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review38386
---


On March 23, 2014, 12:55 a.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> ---
> 
> (Updated March 23, 2014, 12:55 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
> https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Incorporated Jay's comments
> 
> In-place compression with
> 
> 1) Dynamic reallocation in the underlying byte buffer
> 2) Written bytes estimate to reduce reallocation probabilities
> 3) Deallocation in buffer pool following the original capacity
> 
> 
> Diffs
> -
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  b69866a9fb9a8b4e1e78d304a20eda3cbf178c6f 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> 3ebbb804242be6a001b3bae6524afccc85a87602 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Crc32.java 
> 153c5a6d345293aa0ba2cf513373323a6e9f2467 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/main/scala/kafka/producer/ConsoleProducer.scala 
> dd39ff22c918fe5b05f04582b748e32349b2055f 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f

Re: Review Request 18299: Fix KAFKA-1253

2014-03-24 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review38386
---



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java


We can't just drop those records on the floor since we need to inform the 
client about the failure. Perhaps we could just form a request with a single 
RecordBatch and send it. The request will fail and the client will be notified.



clients/src/main/java/org/apache/kafka/common/record/Compressor.java


So, we are switching to a static estimated compression rate? Will that be 
effective in terms of reducing re-allocation rate?



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala


Is this still valid?



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala


Indentation.



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala


We probably can put "value" + i in a separate list and reuse.


- Jun Rao


On March 23, 2014, 12:55 a.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> ---
> 
> (Updated March 23, 2014, 12:55 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
> https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Incorporated Jay's comments
> 
> In-place compression with
> 
> 1) Dynamic reallocation in the underlying byte buffer
> 2) Written bytes estimate to reduce reallocation probabilities
> 3) Deallocation in buffer pool following the original capacity
> 
> 
> Diffs
> -
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  b69866a9fb9a8b4e1e78d304a20eda3cbf178c6f 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> 3ebbb804242be6a001b3bae6524afccc85a87602 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Crc32.java 
> 153c5a6d345293aa0ba2cf513373323a6e9f2467 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/main/scala/kafka/producer/ConsoleProducer.scala 
> dd39ff22c918fe5b05f04582b748e32349b2055f 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
>   perf/src/main/scala/kafka/perf/ProducerPerformance.scala 
> 3df0d130308a35fca96184adc212eea1488f 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> ---
> 
> shallow cluster tests with rolling bounces
> 
> integration tests
> 
>

Re: Review Request 19388: Patch for KAFKA-1251

2014-03-24 Thread Jay Kreps


> On March 23, 2014, 11:57 p.m., Jun Rao wrote:
> > Thanks for the patch. Some comments.
> > 
> > 1. For producer level stats (ie., not node or topic level), for jmx beans, 
> > the package name becomes kafka.producer (since the bean name is 
> > kafka.producer:type=client-id). For the topic and node level stats, jmx 
> > beans have the package name kafka.producer.client-id. Perhaps we can name 
> > the producer level stats under kafka.producer.client-id.aggregate?
> > 
> > 2. Could we make the metrics.timeWindowNs and metrics.samples configurable?
> > 
> > 3. Also, in Metrics, what's the purpose of eventWindow? It's not being used 
> > and I am not sure if there is a case when we will roll the window by count, 
> > instead of size.
> > 
> > 4. When testing using console producer, I sent 1 message and expected the 
> > topic level message rate to be non-zero for at least 30 secs (on average, I 
> > expect the non-zero value to last 45 secs). However, in multiple tests, the 
> > number drops to 0 well before 30 secs.
> > 
> > 5. Metric: Our measurement returns a double. However, we don't synchronize 
> > btw the reader and the writer of the double. Since double access is not 
> > guaranteed to be atomic in java, we could see incorrect value in the output.
> > 
> > 6. Not sure why we need to change the license header. Perhaps we can have a 
> > separate jira to standardize all headers if there is inconsistency?
> 
> Jay Kreps wrote:
> 1. Yes, that makes sense.
> 2. Yes, will do.
> 3. The windows are defined by a time AND event count whichever is 
> triggered first. I am not currently using this but it allows for more 
> responsive metrics updates for metrics where data is plentiful. That is 1000 
> measurements should be enough to get a reasonably low variance measurement so 
> you could make your updates more repsonsive by setting the window to be 1000 
> or 30 seconds, so that frequent metrics would potentially shrink the window 
> to just a few seconds.
> 4. There was a bug that Guozhang caught which I think was causing this. 
> Will be fixed in the next patch.
> 5. Access to the metric value is synchronized by the sensor in 
> KafkaMetric.
> 6. The licenses weren't at the default comment length. I can file a patch 
> to change them all.
> 
> Jun Rao wrote:
> 5. The synchronization in KafkaMetric only handles the reads of the 
> metric. The updates don't seem to be synchronized on the same lock, right?
> 
> 7. Also, noticed that when calculating a reading, we used the elapsed 
> time since the oldest update time, which may not align on the window 
> boundary. Say we have a window of 30 secs and an update comes in at 20 secs. 
> When calculating a rate, we will only account for 10 secs in that window. 
> However, actually only 1 event shows up in the whole 30 secs window.

5. The sensor's lock is used for the metrics the sensor owns so all reads and 
writes go through that. The sensors acquires and releases the lock for all 
metric updates and the metric acquires it for reads.
7. You raise a good point, though I'm not sure if your description is right 
either. Currently I compute the elapsed time across all samples first. However 
if one of the samples is very old (e.g. in the case where there have been no 
updates) this elapsed time will be very long. However when I compute the value 
across samples I will purge the old samples. This is backwards, I need to first 
compute the value and do the purge, then compute the elapsed time off the 
remaining samples so that these two numbers are comparable.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19388/#review38261
---


On March 20, 2014, 12:30 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19388/
> ---
> 
> (Updated March 20, 2014, 12:30 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1251
> https://issues.apache.org/jira/browse/KAFKA-1251
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1251: Add metrics to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  33d62a4b83fbab5b22b91b22f6b744af1c98d262 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/inter

[jira] Subscription: outstanding kafka patches

2014-03-24 Thread jira
Issue Subscription
Filter: outstanding kafka patches (82 issues)
The list of outstanding kafka patches
Subscriber: kafka-mailing-list

Key Summary
KAFKA-1318  waiting for producer to stop is not reliable in system tests
https://issues.apache.org/jira/browse/KAFKA-1318
KAFKA-1317  KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly 
related to TopicDeletionManager or MetricsMeter state
https://issues.apache.org/jira/browse/KAFKA-1317
KAFKA-1300  Added WaitForReplaction admin tool.
https://issues.apache.org/jira/browse/KAFKA-1300
KAFKA-1264  Make ConsoleProducer compression codec configurable
https://issues.apache.org/jira/browse/KAFKA-1264
KAFKA-1253  Implement compression in new producer
https://issues.apache.org/jira/browse/KAFKA-1253
KAFKA-1251  Add metrics to the producer
https://issues.apache.org/jira/browse/KAFKA-1251
KAFKA-1235  Enable server to indefinitely retry on controlled shutdown
https://issues.apache.org/jira/browse/KAFKA-1235
KAFKA-1234  All kafka-run-class.sh to source in user config file (to set env 
vars like KAFKA_OPTS)
https://issues.apache.org/jira/browse/KAFKA-1234
KAFKA-1230  shell script files under bin don't work with cygwin (bash on 
windows)
https://issues.apache.org/jira/browse/KAFKA-1230
KAFKA-1227  Code dump of new producer
https://issues.apache.org/jira/browse/KAFKA-1227
KAFKA-1215  Rack-Aware replica assignment option
https://issues.apache.org/jira/browse/KAFKA-1215
KAFKA-1210  Windows Bat files are not working properly
https://issues.apache.org/jira/browse/KAFKA-1210
KAFKA-1207  Launch Kafka from within Apache Mesos
https://issues.apache.org/jira/browse/KAFKA-1207
KAFKA-1206  allow Kafka to start from a resource negotiator system
https://issues.apache.org/jira/browse/KAFKA-1206
KAFKA-1194  The kafka broker cannot delete the old log files after the 
configured time
https://issues.apache.org/jira/browse/KAFKA-1194
KAFKA-1190  create a draw performance graph script
https://issues.apache.org/jira/browse/KAFKA-1190
KAFKA-1180  WhiteList topic filter gets a NullPointerException on complex Regex
https://issues.apache.org/jira/browse/KAFKA-1180
KAFKA-1173  Using Vagrant to get up and running with Apache Kafka
https://issues.apache.org/jira/browse/KAFKA-1173
KAFKA-1147  Consumer socket timeout should be greater than fetch max wait
https://issues.apache.org/jira/browse/KAFKA-1147
KAFKA-1145  Broker fail to sync after restart
https://issues.apache.org/jira/browse/KAFKA-1145
KAFKA-1144  commitOffsets can be passed the offsets to commit
https://issues.apache.org/jira/browse/KAFKA-1144
KAFKA-1130  "log.dirs" is a confusing property name
https://issues.apache.org/jira/browse/KAFKA-1130
KAFKA-1109  Need to fix GC log configuration code, not able to override 
KAFKA_GC_LOG_OPTS
https://issues.apache.org/jira/browse/KAFKA-1109
KAFKA-1106  HighwaterMarkCheckpoint failure puting broker into a bad state
https://issues.apache.org/jira/browse/KAFKA-1106
KAFKA-1093  Log.getOffsetsBefore(t, …) does not return the last confirmed 
offset before t
https://issues.apache.org/jira/browse/KAFKA-1093
KAFKA-1086  Improve GetOffsetShell to find metadata automatically
https://issues.apache.org/jira/browse/KAFKA-1086
KAFKA-1082  zkclient dies after UnknownHostException in zk reconnect
https://issues.apache.org/jira/browse/KAFKA-1082
KAFKA-1049  Encoder implementations are required to provide an undocumented 
constructor.
https://issues.apache.org/jira/browse/KAFKA-1049
KAFKA-1032  Messages sent to the old leader will be lost on broker GC resulted 
failure
https://issues.apache.org/jira/browse/KAFKA-1032
KAFKA-1025  Producer.send should provide recoverability info on failiure
https://issues.apache.org/jira/browse/KAFKA-1025
KAFKA-1012  Implement an Offset Manager and hook offset requests to it
https://issues.apache.org/jira/browse/KAFKA-1012
KAFKA-1011  Decompression and re-compression on MirrorMaker could result in 
messages being dropped in the pipeline
https://issues.apache.org/jira/browse/KAFKA-1011
KAFKA-1005  kafka.perf.ConsumerPerformance not shutting down consumer
https://issues.apache.org/jira/browse/KAFKA-1005
KAFKA-998   Producer should not retry on non-recoverable error codes
https://issues.apache.org/jira/browse/KAFKA-998
KAFKA-997   Provide a strict verification mode when reading configuration 
properties
https://issues.apache.org/jira/browse/KAFKA-997
KAFKA-996   Capitalize first letter for log entries
https://issues.apache.org/jira/browse/KAFKA-996
KAFKA-984   Avoid a full rebalance in cases when a new topic is discovered but 
container/broker set sta

[jira] [Commented] (KAFKA-1323) log.dirs server property no longer supports relative directories

2014-03-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13945532#comment-13945532
 ] 

Neha Narkhede commented on KAFKA-1323:
--

oops, this is my bad. Missed this during the review. Which points out that we 
need a unit test to cover this as well.

> log.dirs server property no longer supports relative directories
> 
>
> Key: KAFKA-1323
> URL: https://issues.apache.org/jira/browse/KAFKA-1323
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
> Fix For: 0.8.1.1
>
>
> This seems to have been caused by KAFKA-1315 - we now don't support relative 
> directories.
> Steps to reproduce:
> * Set a relative directory for log.dirs. E.g., {{log.dirs=data/kafka-logs}}
> * Bring up the broker and produce some messages: 
> {{./bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --messages 
> 1000 --topic test}}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Reopened] (KAFKA-1315) log.dirs property in KafkaServer intolerant of trailing slash

2014-03-24 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede reopened KAFKA-1315:
--


> log.dirs property in KafkaServer intolerant of trailing slash
> -
>
> Key: KAFKA-1315
> URL: https://issues.apache.org/jira/browse/KAFKA-1315
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.0, 0.8.1
>Reporter: Brent Bradbury
>Assignee: Timothy Chen
>Priority: Blocker
>  Labels: newbie
> Fix For: 0.8.1.1
>
> Attachments: KAFKA-1315.patch, KAFKA-1315_2014-03-20_13:28:06.patch, 
> KAFKA-1315_2014-03-20_14:03:34.patch
>
>
> A trailing slash in log.dirs causes a java.util.NoSuchElementException on the 
> producer and a kafka.common.NotLeaderForPartitionException on the consumer. 
> Per this thread: 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201307.mbox/%3ccafbh0q18placokcby8+jg6ef3n8+ysgkfqghw4yb2up18h-...@mail.gmail.com%3E
> This is because we populate the key in
> ReplicaManager.highWatermarkCheckpoints using the "dirs" config, but look
> up the key using log.dir.getParent. So, if you have a trailing slash in the
> config, they won't match. This seems a bug that we should fix. Could you
> file a jira?
> Thanks,
> Jun
> Still occuring for me, using org.apache.kafka:kafka_2.10:0.8.1



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1323) log.dirs server property no longer supports relative directories

2014-03-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13945469#comment-13945469
 ] 

Joel Koshy commented on KAFKA-1323:
---

{code}
java.util.NoSuchElementException: key not found: data/kafka-logs
at scala.collection.MapLike$class.default(MapLike.scala:223)
at scala.collection.immutable.Map$Map1.default(Map.scala:93)
at scala.collection.MapLike$class.apply(MapLike.scala:134)
at scala.collection.immutable.Map$Map1.apply(Map.scala:93)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:93)
at 
kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:178)
at 
kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:178)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
at kafka.cluster.Partition.makeLeader(Partition.scala:178)
at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:309)
at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:308)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:308)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:260)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:100)
at kafka.server.KafkaApis.handle(KafkaApis.scala:71)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:662)
{code}

I think we just need to update the look-up key in getOrCreateReplica with
the absolute path, but we should check if there are other accesses to the
same map which needs an update.


> log.dirs server property no longer supports relative directories
> 
>
> Key: KAFKA-1323
> URL: https://issues.apache.org/jira/browse/KAFKA-1323
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
> Fix For: 0.8.1.1
>
>
> This seems to have been caused by KAFKA-1315 - we now don't support relative 
> directories.
> Steps to reproduce:
> * Set a relative directory for log.dirs. E.g., {{log.dirs=data/kafka-logs}}
> * Bring up the broker and produce some messages: 
> {{./bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --messages 
> 1000 --topic test}}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1323) log.dirs server property no longer supports relative directories

2014-03-24 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-1323:
-

 Summary: log.dirs server property no longer supports relative 
directories
 Key: KAFKA-1323
 URL: https://issues.apache.org/jira/browse/KAFKA-1323
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
 Fix For: 0.8.1.1


This seems to have been caused by KAFKA-1315 - we now don't support relative 
directories.

Steps to reproduce:
* Set a relative directory for log.dirs. E.g., {{log.dirs=data/kafka-logs}}
* Bring up the broker and produce some messages: 
{{./bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --messages 
1000 --topic test}}




--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1315) log.dirs property in KafkaServer intolerant of trailing slash

2014-03-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13945460#comment-13945460
 ] 

Joel Koshy commented on KAFKA-1315:
---

This patch seems to have resulted in KAFKA-1323

> log.dirs property in KafkaServer intolerant of trailing slash
> -
>
> Key: KAFKA-1315
> URL: https://issues.apache.org/jira/browse/KAFKA-1315
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.0, 0.8.1
>Reporter: Brent Bradbury
>Assignee: Timothy Chen
>Priority: Blocker
>  Labels: newbie
> Fix For: 0.8.1.1
>
> Attachments: KAFKA-1315.patch, KAFKA-1315_2014-03-20_13:28:06.patch, 
> KAFKA-1315_2014-03-20_14:03:34.patch
>
>
> A trailing slash in log.dirs causes a java.util.NoSuchElementException on the 
> producer and a kafka.common.NotLeaderForPartitionException on the consumer. 
> Per this thread: 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201307.mbox/%3ccafbh0q18placokcby8+jg6ef3n8+ysgkfqghw4yb2up18h-...@mail.gmail.com%3E
> This is because we populate the key in
> ReplicaManager.highWatermarkCheckpoints using the "dirs" config, but look
> up the key using log.dir.getParent. So, if you have a trailing slash in the
> config, they won't match. This seems a bug that we should fix. Could you
> file a jira?
> Thanks,
> Jun
> Still occuring for me, using org.apache.kafka:kafka_2.10:0.8.1



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1317) KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly related to TopicDeletionManager or MetricsMeter state

2014-03-24 Thread Timothy Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timothy Chen updated KAFKA-1317:


Attachment: KAFKA-1317_2014-03-24_11:06:15.patch

> KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly related to 
> TopicDeletionManager or MetricsMeter state
> ---
>
> Key: KAFKA-1317
> URL: https://issues.apache.org/jira/browse/KAFKA-1317
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Brent Bradbury
>Assignee: Timothy Chen
>Priority: Blocker
>  Labels: newbie
> Fix For: 0.8.1.1
>
> Attachments: KAFKA-1317.patch, KAFKA-1317_2014-03-23_23:48:28.patch, 
> KAFKA-1317_2014-03-24_11:06:15.patch, threaddump.txt
>
>
> When I run an in-process instance of KafkaServer, send a message through it, 
> then call shutdown(), some threads never exit and the process hangs until the 
> process is killed manually. The same scenario does not result in a hang on 
> 0.8.0. The hang happens when calling both shutdown() by itself as well as 
> shutdown() and awaitShutdown() together. I have seen similar behavior 
> shutting down a deployed kafka server as well, but haven't had time to 
> diagnose whether or not it is the same symptom.
> I suspect either the metrics-meter-tick-thread-1 & 2 or delete-topics-thread
>  (waiting in 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)
>  is to blame. Since the TopicDeletionManager is new, it seems more suspicious 
> to me. A complete thread dump is attached; the suspect threads are below.
> "delete-topics-thread" prio=5 tid=0x7fb3e31d2800 nid=0x6b03 waiting on 
> condition [0x00013c3b3000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00012e6e6920> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>   at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)
>   at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334)
>   at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
>   at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
>   at kafka.utils.Utils$.inLock(Utils.scala:538)
>   at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>Locked ownable synchronizers:
>   - None
> "metrics-meter-tick-thread-2" daemon prio=5 tid=0x7fb3e31c1000 nid=0x5f03 
> runnable [0x00013ab8f000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00012e7d05d8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:724)
>Locked ownable synchronizers:
>   - None
> "metrics-meter-tick-thread-1" daemon prio=5 tid=0x7fb3e31ef800 nid=0x5e03 
> waiting on condition [0x00013a98c000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00012e7d05d8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronize

[jira] [Commented] (KAFKA-1317) KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly related to TopicDeletionManager or MetricsMeter state

2014-03-24 Thread Timothy Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13945429#comment-13945429
 ] 

Timothy Chen commented on KAFKA-1317:
-

Updated reviewboard https://reviews.apache.org/r/19577/
 against branch origin/0.8.1

> KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly related to 
> TopicDeletionManager or MetricsMeter state
> ---
>
> Key: KAFKA-1317
> URL: https://issues.apache.org/jira/browse/KAFKA-1317
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Brent Bradbury
>Assignee: Timothy Chen
>Priority: Blocker
>  Labels: newbie
> Fix For: 0.8.1.1
>
> Attachments: KAFKA-1317.patch, KAFKA-1317_2014-03-23_23:48:28.patch, 
> KAFKA-1317_2014-03-24_11:06:15.patch, threaddump.txt
>
>
> When I run an in-process instance of KafkaServer, send a message through it, 
> then call shutdown(), some threads never exit and the process hangs until the 
> process is killed manually. The same scenario does not result in a hang on 
> 0.8.0. The hang happens when calling both shutdown() by itself as well as 
> shutdown() and awaitShutdown() together. I have seen similar behavior 
> shutting down a deployed kafka server as well, but haven't had time to 
> diagnose whether or not it is the same symptom.
> I suspect either the metrics-meter-tick-thread-1 & 2 or delete-topics-thread
>  (waiting in 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)
>  is to blame. Since the TopicDeletionManager is new, it seems more suspicious 
> to me. A complete thread dump is attached; the suspect threads are below.
> "delete-topics-thread" prio=5 tid=0x7fb3e31d2800 nid=0x6b03 waiting on 
> condition [0x00013c3b3000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00012e6e6920> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>   at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)
>   at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334)
>   at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
>   at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
>   at kafka.utils.Utils$.inLock(Utils.scala:538)
>   at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>Locked ownable synchronizers:
>   - None
> "metrics-meter-tick-thread-2" daemon prio=5 tid=0x7fb3e31c1000 nid=0x5f03 
> runnable [0x00013ab8f000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00012e7d05d8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:724)
>Locked ownable synchronizers:
>   - None
> "metrics-meter-tick-thread-1" daemon prio=5 tid=0x7fb3e31ef800 nid=0x5e03 
> waiting on condition [0x00013a98c000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00012e7d05d8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>   at 
> java.util.co

Re: Review Request 19577: Patch for KAFKA-1317

2014-03-24 Thread Timothy Chen

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19577/
---

(Updated March 24, 2014, 6:06 p.m.)


Review request for kafka.


Bugs: KAFKA-1317
https://issues.apache.org/jira/browse/KAFKA-1317


Repository: kafka


Description
---

KAFKA-1317 Fix Kafka shutdown with delete topic on


Diffs (updated)
-

  core/src/main/scala/kafka/controller/KafkaController.scala 
f12ffc2af1f739f7d46058c97f0eeed6b55da14d 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
8262e10ce4364f057e449172ace38320e44247db 

Diff: https://reviews.apache.org/r/19577/diff/


Testing
---


Thanks,

Timothy Chen



Re: Review Request 19577: Patch for KAFKA-1317

2014-03-24 Thread Timothy Chen


> On March 24, 2014, 4:20 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 74
> > 
> >
> > This is probably trying to fix the problem reported in KAFKA-1307? If 
> > yes, then this change might not be enough to fix the issue. Since the 
> > await() and signal would now have to ensure that they lock on deleteLock 
> > instead of controllerLock.

This is not intended to fix KAFKA-1307. I actually got a deadlock when I just 
call deleteTopicThread.shutdown in controller shutdown, and took me sometime to 
finally figure out that I was holding the controllerLock while trying to 
shutdown the deleteTopicThread, which the thread is awaiting on the same lock's 
condition. I don't see why the delete Topic manager needs to hold on to the 
same lock, as the signaling and awaiting seems to be indepedendent of other 
operations (but I very well can be wrong).

Looking at KAFKA-1307 it seems that a patch is submitted for that already? Do 
we still see issues from that?


- Timothy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19577/#review38306
---


On March 24, 2014, 6:48 a.m., Timothy Chen wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19577/
> ---
> 
> (Updated March 24, 2014, 6:48 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1317
> https://issues.apache.org/jira/browse/KAFKA-1317
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1317 Fix Kafka shutdown with delete topic on
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> f12ffc2af1f739f7d46058c97f0eeed6b55da14d 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
> 8262e10ce4364f057e449172ace38320e44247db 
> 
> Diff: https://reviews.apache.org/r/19577/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Timothy Chen
> 
>



[jira] [Commented] (KAFKA-1029) Zookeeper leader election stuck in ephemeral node retry loop

2014-03-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13945311#comment-13945311
 ] 

Neha Narkhede commented on KAFKA-1029:
--

[~jbrosenb...@gmail.com] It is a little difficult to say what's going on. It 
would help if you can tell us how to reproduce this? If you suspect this is due 
to network outage, try doing a test where you disable the network interface and 
see if you can reproduce it. That will help us troubleshoot this.

> Zookeeper leader election stuck in ephemeral node retry loop
> 
>
> Key: KAFKA-1029
> URL: https://issues.apache.org/jira/browse/KAFKA-1029
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Sam Meder
>Priority: Blocker
> Fix For: 0.8.0
>
> Attachments: 
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, 
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, 
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ 
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a 
> while back in a different session, hence I will backoff for this node to be 
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with 
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
> controllerContext.zkClient.subscribeDataChanges(electionPath, 
> leaderChangeListener)
> val timestamp = SystemTime.milliseconds.toString
> val electString = ...
> try {
>   
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, leaderId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it 
> looks like a new registration will be added every elect call - shouldn't it 
> register in startup()?) so can update leaderId to the current leader before 
> the call to create. If that happens then we will continuously get node exists 
> exceptions and the checker function will always return true, i.e. we will 
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when 
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, brokerId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the 
> broker that owned the node previously, although I am still not 100% sure if 
> that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 19577: Patch for KAFKA-1317

2014-03-24 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19577/#review38306
---


I couldn't reproduce the shutdown problem where the DeleteTopicsThread fails to 
exit. Could you? 
We should definitely add a system test to cover that scenario.


core/src/main/scala/kafka/controller/KafkaController.scala


It seems more natural to let the logic be in onControllerResignation and 
call that from shutdown(). That way we don't have to change the signature of 
shutdown() to include a flag. It is slightly confusing.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala


This is probably trying to fix the problem reported in KAFKA-1307? If yes, 
then this change might not be enough to fix the issue. Since the await() and 
signal would now have to ensure that they lock on deleteLock instead of 
controllerLock.


- Neha Narkhede


On March 24, 2014, 6:48 a.m., Timothy Chen wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19577/
> ---
> 
> (Updated March 24, 2014, 6:48 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1317
> https://issues.apache.org/jira/browse/KAFKA-1317
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1317 Fix Kafka shutdown with delete topic on
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> f12ffc2af1f739f7d46058c97f0eeed6b55da14d 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
> 8262e10ce4364f057e449172ace38320e44247db 
> 
> Diff: https://reviews.apache.org/r/19577/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Timothy Chen
> 
>



Re: Review Request 19388: Patch for KAFKA-1251

2014-03-24 Thread Jun Rao


> On March 23, 2014, 11:57 p.m., Jun Rao wrote:
> > Thanks for the patch. Some comments.
> > 
> > 1. For producer level stats (ie., not node or topic level), for jmx beans, 
> > the package name becomes kafka.producer (since the bean name is 
> > kafka.producer:type=client-id). For the topic and node level stats, jmx 
> > beans have the package name kafka.producer.client-id. Perhaps we can name 
> > the producer level stats under kafka.producer.client-id.aggregate?
> > 
> > 2. Could we make the metrics.timeWindowNs and metrics.samples configurable?
> > 
> > 3. Also, in Metrics, what's the purpose of eventWindow? It's not being used 
> > and I am not sure if there is a case when we will roll the window by count, 
> > instead of size.
> > 
> > 4. When testing using console producer, I sent 1 message and expected the 
> > topic level message rate to be non-zero for at least 30 secs (on average, I 
> > expect the non-zero value to last 45 secs). However, in multiple tests, the 
> > number drops to 0 well before 30 secs.
> > 
> > 5. Metric: Our measurement returns a double. However, we don't synchronize 
> > btw the reader and the writer of the double. Since double access is not 
> > guaranteed to be atomic in java, we could see incorrect value in the output.
> > 
> > 6. Not sure why we need to change the license header. Perhaps we can have a 
> > separate jira to standardize all headers if there is inconsistency?
> 
> Jay Kreps wrote:
> 1. Yes, that makes sense.
> 2. Yes, will do.
> 3. The windows are defined by a time AND event count whichever is 
> triggered first. I am not currently using this but it allows for more 
> responsive metrics updates for metrics where data is plentiful. That is 1000 
> measurements should be enough to get a reasonably low variance measurement so 
> you could make your updates more repsonsive by setting the window to be 1000 
> or 30 seconds, so that frequent metrics would potentially shrink the window 
> to just a few seconds.
> 4. There was a bug that Guozhang caught which I think was causing this. 
> Will be fixed in the next patch.
> 5. Access to the metric value is synchronized by the sensor in 
> KafkaMetric.
> 6. The licenses weren't at the default comment length. I can file a patch 
> to change them all.

5. The synchronization in KafkaMetric only handles the reads of the metric. The 
updates don't seem to be synchronized on the same lock, right?

7. Also, noticed that when calculating a reading, we used the elapsed time 
since the oldest update time, which may not align on the window boundary. Say 
we have a window of 30 secs and an update comes in at 20 secs. When calculating 
a rate, we will only account for 10 secs in that window. However, actually only 
1 event shows up in the whole 30 secs window.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19388/#review38261
---


On March 20, 2014, 12:30 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19388/
> ---
> 
> (Updated March 20, 2014, 12:30 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1251
> https://issues.apache.org/jira/browse/KAFKA-1251
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1251: Add metrics to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  33d62a4b83fbab5b22b91b22f6b744af1c98d262 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 565331dfb9cd1d65be37ed97830aa42e44d2e127 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> 3ebbb804242be6a001b3bae6524afccc85a87602 
>   clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
> 6db2dfbe94c940efa37463298f0b0b1893e646e1 
>   clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
> 7e4849b7a148009c8a878349d7f0239108ccad8c 
>   clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java 
> 3b0454f26490d1f4a2a80efb00165fc72587fbf8 
>   
> clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java 
> f8b413a8c273cdad56177fbc6971fece4feb86b3 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> 9305b61ddeaa2bb400cbbb6d3c99c8ecaad

[jira] [Commented] (KAFKA-1029) Zookeeper leader election stuck in ephemeral node retry loop

2014-03-24 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13945223#comment-13945223
 ] 

Jason Rosenberg commented on KAFKA-1029:


I'm starting to suspect my issue is actually due to network latency introduced 
between the consumers and the zk servers, which is causing zk connection 
timeouts and flapping.  Thus, I think the issue is probably not related to the 
original bug in this issue per se (it is having trouble with zk timeouts, 
possibly resulting in consumer clients not seeing/updating a consistent view of 
these ephemeral nodes?  Does that make sense?

That being said, it still doesn't seem to make sense that an ephemeral node 
would not ever get removed eventually.  Also, perhaps the strange folksy error 
log messaging in this case is assuming rather specific scenarios that don't 
always apply, which could be more simplified here?

> Zookeeper leader election stuck in ephemeral node retry loop
> 
>
> Key: KAFKA-1029
> URL: https://issues.apache.org/jira/browse/KAFKA-1029
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Sam Meder
>Priority: Blocker
> Fix For: 0.8.0
>
> Attachments: 
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, 
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, 
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ 
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a 
> while back in a different session, hence I will backoff for this node to be 
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with 
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
> controllerContext.zkClient.subscribeDataChanges(electionPath, 
> leaderChangeListener)
> val timestamp = SystemTime.milliseconds.toString
> val electString = ...
> try {
>   
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, leaderId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it 
> looks like a new registration will be added every elect call - shouldn't it 
> register in startup()?) so can update leaderId to the current leader before 
> the call to create. If that happens then we will continuously get node exists 
> exceptions and the checker function will always return true, i.e. we will 
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when 
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, brokerId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the 
> broker that owned the node previously, although I am still not 100% sure if 
> that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1322) slow startup after unclean shutdown

2014-03-24 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-1322:
-

Resolution: Won't Fix
Status: Resolved  (was: Patch Available)

ok

> slow startup after unclean shutdown
> ---
>
> Key: KAFKA-1322
> URL: https://issues.apache.org/jira/browse/KAFKA-1322
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Alexey Ozeritskiy
>Priority: Critical
> Attachments: kafka.patch
>
>
> Kafka 0.8.1 checks all segments on unclean shutdown. 
> 0.8.0 checks only the latest segment.



--
This message was sent by Atlassian JIRA
(v6.2#6252)