[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-13 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168996#comment-14168996
 ] 

Sriram Subramanian commented on KAFKA-1555:
---

[~gwenshap] +1 on your suggestion. We can get the documentation ready and then 
do the linking during the release. 

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, 
 KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, 
 KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26633: Patch for KAFKA-1305

2014-10-13 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26633/
---

(Updated Oct. 13, 2014, 2:30 p.m.)


Review request for kafka.


Bugs: KAFKA-1305
https://issues.apache.org/jira/browse/KAFKA-1305


Repository: kafka


Description
---

KAFKA-1305. Controller can hang on controlled shutdown with auto leader balance 
enabled.


Diffs (updated)
-

  core/src/main/scala/kafka/server/KafkaConfig.scala 
90af698b01ec82b6168e02b6af41887ef164ad51 

Diff: https://reviews.apache.org/r/26633/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-10-13 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169343#comment-14169343
 ] 

Sriharsha Chintalapani commented on KAFKA-1305:
---

Updated reviewboard https://reviews.apache.org/r/26633/diff/
 against branch origin/trunk

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2, 0.9.0

 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, 
 KAFKA-1305_2014-10-13_07:30:45.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 kafka-scheduler-0 - Thread t@117
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
   at 
 

[jira] [Updated] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-10-13 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1305:
--
Attachment: KAFKA-1305_2014-10-13_07:30:45.patch

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2, 0.9.0

 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, 
 KAFKA-1305_2014-10-13_07:30:45.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 kafka-scheduler-0 - Thread t@117
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
   at 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
   at 
 

[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-13 Thread Vladimir Tretyakov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169396#comment-14169396
 ] 

Vladimir Tretyakov commented on KAFKA-1481:
---

Added another patch which introduce metrics name like pairs (key=value), 
examples:
{code}
kafka.consumer:type=ConsumerTopicMetrics,name=clientId=af_servers,AllTopics,BytesPerSec
kafka.cluster:type=Partition,name=topic=spm_alerts_topic,partitionId=0,UnderReplicated
kafka.network:type=SocketServer,name=NetworkProcessorNum=1,IdlePercent
kafka.server:type=BrokerTopicMetrics,name=topic=spm_topic,MessagesInPerSec

kafka.server:type=FetcherStats,name=clientId=af_servers,ConsumerFetcherThread,groupId=af_servers,consumerHostName=wawanawna,timestamp=1413199781501,uuid=58a5cc70,fetcherId=0,sourceBrokerId=0,brokerHost=wawanawna,brokerPort=9092,BytesPerSec
kafka.server:type=FetcherLagMetrics,name=clientId=af_servers,ConsumerFetcherThread,groupId=af_servers,consumerHostName=wawanawna,timestamp=1413199781501,uuid=58a5cc70,fetcherId=0,sourceBrokerId=0,brokerHost=wawanawna,brokerPort=9092,topic=spm_topic,partitionId=0,ConsumerLag
kafka.consumer:type=FetchRequestAndResponseMetrics,name=clientId=af_servers,ConsumerFetcherThread,groupId=af_servers,consumerHostName=wawanawna,timestamp=1413199781501,uuid=58a5cc70,fetcherId=0,sourceBrokerId=0,brokerHost=wawanawna,brokerPort=9092,FetchResponseSize
{code}

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-13 Thread Vladimir Tretyakov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Tretyakov updated KAFKA-1481:
--
Attachment: KAFKA-1481_2014-10-13_18-23-35.patch

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

{code}

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

public static void main(String[] args) throws IOException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}

Callback callback = new Callback() {
public void onCompletion(RecordMetadata metadata, 
Exception exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
};

ProducerRecord record = new ProducerRecord(topic, 
builder.toString().getBytes());
while (true) {
try {
for (int i = 0; i  producer.length; i++) {
producer[i].send(record, callback);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}

}

}

}

{code}

{code: name=kafkaproducer.properties }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).
buffer.memory=134217728
compression.type=snappy
retries=3
# DEFAULT FROM THE KAFKA...
# batch size =  ((buffer.memory) / (number of partitions)) (so we can have in 
progress batch size created for each partition.).
batch.size=1048576
#2MiB
max.request.size=1048576
send.buffer.bytes=2097152
# We do not want to block the buffer Full so application thread will not be 
blocked but logs lines will be dropped...
block.on.buffer.full=false
#2MiB
send.buffer.bytes=2097152

#wait...
linger.ms=5000
{code}

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Jun Rao

 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:08 PM:
-

{code TestNetworkDownProducer.java}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = logmon.test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

{code: kafkaproducer.properties }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

Property File
{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}


{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:10 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

This is property file used:
{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:11 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

This is property file used:
{code}
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all 

[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169453#comment-14169453
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

 [~jkreps]  Let me know if you need any other help !!

Thanks,
Bhavesh 

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Jun Rao

 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt

2014-10-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169493#comment-14169493
 ] 

Guozhang Wang commented on KAFKA-1670:
--

Verified that system test has recovered, thanks!

 Corrupt log files for segment.bytes values close to Int.MaxInt
 --

 Key: KAFKA-1670
 URL: https://issues.apache.org/jira/browse/KAFKA-1670
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1670.patch, KAFKA-1670.patch, 
 KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, 
 KAFKA-1670_2014-10-07_13:39:13.patch, KAFKA-1670_2014-10-07_13:49:10.patch, 
 KAFKA-1670_2014-10-07_18:39:31.patch


 The maximum value for the topic-level config {{segment.bytes}} is 
 {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt 
 their log files, leaving them unreadable.*
 We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. 
 One by one, the ISR of all partitions shrunk to 1. Brokers would crash when 
 restarted, attempting to read from a negative offset in a log file. After 
 discovering that many segment files had grown to 4GB or more, we were forced 
 to shut down our *entire production Kafka cluster* for several hours while we 
 split all segment files into 1GB chunks.
 Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used 
 inconsistently. It is treated as a *soft* maximum for the size of the segment 
 file 
 (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26)
  with logs rolled only after 
 (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246)
  they exceed this value. However, much of the code that deals with log files 
 uses *ints* to store the size of the file and the position in the file. 
 Overflow of these ints leads the broker to append to the segments 
 indefinitely, and to fail to read these segments for consuming or recovery.
 This is trivial to reproduce:
 {code}
 $ bin/kafka-topics.sh --topic segment-bytes-test --create 
 --replication-factor 2 --partitions 1 --zookeeper zkhost:2181
 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config 
 segment.bytes=2147483647 --zookeeper zkhost:2181
 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | 
 bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 
 --topic segment-bytes-test
 {code}
 After running for a few minutes, the log file is corrupt:
 {code}
 $ ls -lh data/segment-bytes-test-0/
 total 9.7G
 -rw-r--r-- 1 root root  10M Oct  3 19:39 .index
 -rw-r--r-- 1 root root 9.7G Oct  3 19:39 .log
 {code}
 We recovered the data from the log files using a simple Python script: 
 https://gist.github.com/also/9f823d9eb9dc0a410796



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 5:05 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

This is property file used:
{code}
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all 

[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-13 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169616#comment-14169616
 ] 

James Oliver commented on KAFKA-1493:
-

Sure, I'll take a look at it now.

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-13 Thread Bhavesh Mistry
Hi Kafka Dev Team,

When I run the test to send message to single partition for 3 minutes or so
on, I have encounter deadlock (please see the screen attached) and thread
contention from YourKit profiling.

Use Case:

1)  Aggregating messages into same partition for metric counting.
2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.


Here is output:

Frozen threads found (potential deadlock)

It seems that the following threads have not changed their stack for more
than 10 seconds.
These threads are possibly (but not necessarily!) in a deadlock or hung.

pool-1-thread-128 --- Frozen for at least 2m
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
Callback) KafkaProducer.java:237
org.kafka.test.TestNetworkDownProducer$MyProducer.run()
TestNetworkDownProducer.java:84
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
ThreadPoolExecutor.java:1145
java.util.concurrent.ThreadPoolExecutor$Worker.run()
ThreadPoolExecutor.java:615
java.lang.Thread.run() Thread.java:744



pool-1-thread-159 --- Frozen for at least 2m 1 sec
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
Callback) KafkaProducer.java:237
org.kafka.test.TestNetworkDownProducer$MyProducer.run()
TestNetworkDownProducer.java:84
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
ThreadPoolExecutor.java:1145
java.util.concurrent.ThreadPoolExecutor$Worker.run()
ThreadPoolExecutor.java:615
java.lang.Thread.run() Thread.java:744



pool-1-thread-55 --- Frozen for at least 2m
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
Callback) KafkaProducer.java:237
org.kafka.test.TestNetworkDownProducer$MyProducer.run()
TestNetworkDownProducer.java:84
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
ThreadPoolExecutor.java:1145
java.util.concurrent.ThreadPoolExecutor$Worker.run()
ThreadPoolExecutor.java:615
java.lang.Thread.run() Thread.java:744


[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation

2014-10-13 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169648#comment-14169648
 ] 

Joel Koshy commented on KAFKA-1634:
---

[~junrao] yes you are right. The OffsetFetchResponse returns offset, metadata 
and error - it does not include the timestamp.


 Improve semantics of timestamp in OffsetCommitRequests and update 
 documentation
 ---

 Key: KAFKA-1634
 URL: https://issues.apache.org/jira/browse/KAFKA-1634
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Blocker
 Fix For: 0.8.2


 From the mailing list -
 following up on this -- I think the online API docs for OffsetCommitRequest
 still incorrectly refer to client-side timestamps:
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
 Wasn't that removed and now always handled server-side now?  Would one of
 the devs mind updating the API spec wiki?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Message Metadata

2014-10-13 Thread Jay Kreps
I need to take more time to think about this. Here are a few off-the-cuff
remarks:

- To date we have tried really, really hard to keep the data model for
message simple since after all you can always add whatever you like inside
the message body.

- For system tags, why not just make these fields first class fields in
message? The purpose of a system tag is presumably that Why have a bunch of
key-value pairs versus first-class fields?

- You don't necessarily need application-level tags explicitly represented
in the message format for efficiency. The application can define their own
header (e.g. their message could be a size delimited header followed by a
size delimited body). But actually if you use Avro you don't even need this
I don't think. Avro has the ability to just deserialize the header fields
in your message. Avro has a notion of reader and writer schemas. The writer
schema is whatever the message was written with. If the reader schema is
just the header, avro will skip any fields it doesn't need and just
deserialize the fields it does need. This is actually a much more usable
and flexible way to define a header since you get all the types avro allows
instead of just bytes.

- We will need to think carefully about what to do with timestamps if we
end up including them. There are actually several timestamps
  - The time the producer created the message
  - The time the leader received the message
  - The time the current broker received the message
The producer timestamps won't be at all increasing. The leader timestamp
will be mostly increasing except when the clock changes or leadership
moves. This somewhat complicates the use of these timestamps, though. From
the point of view of the producer the only time that matters is the time
the message was created. However since the producer sets this it can be
arbitrarily bad (remember all the ntp issues and 1970 timestamps we would
get). Say that the heuristic was to use the timestamp of the first message
in a file for retention, the problem would be that the timestamps for the
segments need not even be sequential and a single bad producer could send
data with time in the distant past or future causing data to be deleted or
retained forever. Using the broker timestamp at write time is better,
though obvious that would be overwritten when data is mirrored between
clusters (the mirror would then have a different time--and if the mirroring
ever stopped that gap could be large). One approach would be to use the
client timestamp but have the broker overwrite it if it is too bad (e.g.
off by more than a minute, say).

-Jay

On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Thanks Guozhang! This is an excellent write-up and the approach nicely
 consolidates a number of long-standing issues. It would be great if
 everyone can review this carefully and give feedback.

 Also, wrt discussion in the past we have used a mix of wiki comments
 and the mailing list. Personally, I think it is better to discuss on
 the mailing list (for more visibility) and just post a bold link to
 the (archived) mailing list thread on the wiki.

 Joel

 On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
  Hello all,
 
  I put some thoughts on enhancing our current message metadata format to
  solve a bunch of existing issues:
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
 
  This wiki page is for kicking off some discussions about the feasibility
 of
  adding more info into the message header, and if possible how we would
 add
  them.
 
  -- Guozhang




Re: Review Request 26633: Patch for KAFKA-1305

2014-10-13 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26633/#review56427
---

Ship it!



core/src/main/scala/kafka/server/KafkaConfig.scala
https://reviews.apache.org/r/26633/#comment96710

this looks good now.


- Neha Narkhede


On Oct. 13, 2014, 2:30 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26633/
 ---
 
 (Updated Oct. 13, 2014, 2:30 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1305
 https://issues.apache.org/jira/browse/KAFKA-1305
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1305. Controller can hang on controlled shutdown with auto leader 
 balance enabled.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 90af698b01ec82b6168e02b6af41887ef164ad51 
 
 Diff: https://reviews.apache.org/r/26633/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Updated] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment

2014-10-13 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1631:
-
Assignee: Ewen Cheslack-Postava

 ReplicationFactor and under-replicated partitions incorrect during 
 reassignment
 ---

 Key: KAFKA-1631
 URL: https://issues.apache.org/jira/browse/KAFKA-1631
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Attachments: KAFKA-1631-v1.patch


 We have a topic with a replication factor of 3. We monitor 
 UnderReplicatedPartitions as recommended by the documentation.
 During a partition reassignment, partitions being reassigned are reported as 
 under-replicated. Running a describe shows:
 {code}
 Topic:activity-wal-1PartitionCount:15   ReplicationFactor:5 
 Configs:
 Topic: activity-wal-1   Partition: 0Leader: 14  Replicas: 
 14,13,12,11,15Isr: 14,12,11,13
 Topic: activity-wal-1   Partition: 1Leader: 14  Replicas: 
 15,14,11  Isr: 14,11
 Topic: activity-wal-1   Partition: 2Leader: 11  Replicas: 
 11,15,12  Isr: 12,11,15
 ...
 {code}
 It looks like the displayed replication factor, 5, is simply the number of 
 replicas listed for the first partition, which includes both brokers in the 
 current list and those onto which the partition is being reassigned. 
 Partition 0 is also included in the list when using the 
 `--under-replicated-partitions` option, even though it is replicated to more 
 partitions than the true replication factor.
 During a reassignment, the under-replicated partitions metric is not usable, 
 meaning that actual under-replicated partitions can go unnoticed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-10-13 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1196:
-
Assignee: (was: Neha Narkhede)

 java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
 ---

 Key: KAFKA-1196
 URL: https://issues.apache.org/jira/browse/KAFKA-1196
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.0
 Environment: running java 1.7, linux and kafka compiled against scala 
 2.9.2
Reporter: Gerrit Jansen van Vuuren
Priority: Blocker
  Labels: newbie
 Fix For: 0.9.0


 I have 6 topics each with 8 partitions spread over 4 kafka servers.
 the servers are 24 core 72 gig ram.
 While consuming from the topics I get an IlegalArgumentException and all 
 consumption stops, the error keeps on throwing.
 I've tracked it down to FectchResponse.scala line 33
 The error happens when the FetchResponsePartitionData object's readFrom 
 method calls:
 messageSetBuffer.limit(messageSetSize)
 I put in some debug code the the messageSetSize is 671758648, while the 
 buffer.capacity() gives 155733313, for some reason the buffer is smaller than 
 the required message size.
 I don't know the consumer code enough to debug this. It doesn't matter if 
 compression is used or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2014-10-13 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169717#comment-14169717
 ] 

Jay Kreps commented on KAFKA-1646:
--

Hey [~xueqiang] what is the latency impact for that preallocation for a large 
file? I.e. if your segment size is 1GB will there be a long pause when the new 
log segment is rolled? How long does that take?

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1701) Improve controller and broker message handling.

2014-10-13 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1701:

Description: 
This ticket is a memo for future controller refactoring.
It is related to KAFKA-1647. Ideally, the broker should only follow instruction 
from controller but not handle it smartly. For KAFKA-1647, the controller 
should filter out the partitions whose leader is not up yet before send 
LeaderAndIsrRequest to broker. 
The idea is controller should handle all the edge cases instead of letting 
broker do it.

  was:
This ticket is a memo for future controller refactoring.
It is related to KAFKA-1547. Ideally, the broker should only follow instruction 
from controller but not handle it smartly. For KAFKA-1547, the controller 
should filter out the partitions whose leader is not up yet before send 
LeaderAndIsrRequest to broker. 
The idea is controller should handle all the edge cases instead of letting 
broker do it.


 Improve controller and broker message handling.
 ---

 Key: KAFKA-1701
 URL: https://issues.apache.org/jira/browse/KAFKA-1701
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin

 This ticket is a memo for future controller refactoring.
 It is related to KAFKA-1647. Ideally, the broker should only follow 
 instruction from controller but not handle it smartly. For KAFKA-1647, the 
 controller should filter out the partitions whose leader is not up yet before 
 send LeaderAndIsrRequest to broker. 
 The idea is controller should handle all the edge cases instead of letting 
 broker do it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : Kafka-trunk #300

2014-10-13 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/300/changes



[jira] [Updated] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-10-13 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1305:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2, 0.9.0

 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, 
 KAFKA-1305_2014-10-13_07:30:45.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 kafka-scheduler-0 - Thread t@117
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
   at 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
   at 
 

[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-13 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169616#comment-14169616
 ] 

James Oliver edited comment on KAFKA-1493 at 10/13/14 7:41 PM:
---

The i/o streams provided in the patch are nearly identical to the i/o streams 
in lz4-java. As it stands, I don't think it buys us much. Other than that, 
looks good, just a few spots still referring to lz4hc (I think KAFKA-1471 
changes were applied to trunk after this patch was created).

This details the formal specification 
http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html

Here's an example C++ implementation of that format:
https://github.com/t-mat/lz4mt/blob/master/src/lz4mt.cpp

We still need a Java implementation


was (Author: joliver):
Sure, I'll take a look at it now.

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 26658: Patch for KAFKA-1493

2014-10-13 Thread James Oliver

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26658/
---

Review request for kafka.


Bugs: KAFKA-1493
https://issues.apache.org/jira/browse/KAFKA-1493


Repository: kafka


Description
---

KAFKA-1493


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
bf4ed66791b9a502aae6cb2ec7681f42732d9a43 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
5227b2d7ab803389d1794f48c8232350c05b14fd 
  clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
0323f5f7032dceb49d820c17a41b78c56591ffc4 
  config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
  core/src/main/scala/kafka/message/CompressionCodec.scala 
de0a0fade5387db63299c6b112b3c9a5e41d82ec 
  core/src/main/scala/kafka/message/CompressionFactory.scala 
8420e13d0d8680648df78f22ada4a0d4e3ab8758 
  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b024a693c23cb21f1efe405ed414bf23f3974f31 
  core/src/main/scala/kafka/tools/PerfConfig.scala 
c72002976d90416559090a665f6494072a6c2dec 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
c95485170fd8b4f5faad740f049e5d09aca8829d 
  core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
6f0addcea64f1e78a4de50ec8135f4d02cebd305 
  core/src/test/scala/unit/kafka/message/MessageTest.scala 
958c1a60069ad85ae20f5c58e74679cd9fa6f70e 

Diff: https://reviews.apache.org/r/26658/diff/


Testing
---


Thanks,

James Oliver



[jira] [Updated] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-13 Thread James Oliver (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Oliver updated KAFKA-1493:

Attachment: KAFKA-1493.patch

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169987#comment-14169987
 ] 

Guozhang Wang commented on KAFKA-1555:
--

One question I have while trying to rebase KAFKA-1583 on KAFKA-1555: for the 
NotEnoughReplicasAfterAppend exception, what should be the behavior of the 
client? Currently the producer client would retry on all exceptions, but would 
that be ideal for NotEnoughReplicasAfterAppend, since the message has already 
been appended and retrying will guarantee duplicates?

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, 
 KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, 
 KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170002#comment-14170002
 ] 

Guozhang Wang commented on KAFKA-1555:
--

Another thought about the min.isr config: we put it into the log configs since 
we do not have partition configs besides the global server configs and log 
configs, and by doing so we are required to pass the requiredAcks from replica 
manager to partition, then to log in order to have it checked against the min 
isr. It may be better to add the partition configs and move min.isr into that 
config set instead of making it part of the log configs. Thoughts?

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, 
 KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, 
 KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170071#comment-14170071
 ] 

Jun Rao commented on KAFKA-1493:


James,

Thanks for the patch. A couple of more questions.

1. The following header frame used in the patch doesn't seem to match exactly 
what's described in 
http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html. 
So, we are inventing our own header? Is that ok?
/*
* Message format:
* HEADER which consists of:
* 1) magic byte sequence (8 bytes)
* 2) compression method token (1 byte)
* 3) compressed length (4 bytes)
* 4) original message length (4 bytes)
* and compressed message itself
* Block size: 64 Kb
* */

2. If the io stream code in this patch is identical to that in lz4-java, could 
we just use lz4-java instead?

Thanks,

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170095#comment-14170095
 ] 

Jun Rao commented on KAFKA-1555:


When a client receives a NotEnoughReplicasAfterAppendException, it can choose 
to stop producing or wait and then retry. So, we can probably make 
NotEnoughReplicasAfterAppend a non-retriable exception and let the client 
decide what to do.

All the topic configs are essentially per partition. However, some of those 
configs are applicable to the log and some others are only applicable to the 
logical partition. So, we can probably split topic configs into a logConfig and 
a partitionConfig and pass them to Log and Partition, respectively. This can be 
handled in a separate jira though.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, 
 KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, 
 KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-979) Add jitter for time based rolling

2014-10-13 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-979:

Assignee: Ewen Cheslack-Postava  (was: Sriram Subramanian)
  Status: Patch Available  (was: Open)

 Add jitter for time based rolling
 -

 Key: KAFKA-979
 URL: https://issues.apache.org/jira/browse/KAFKA-979
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Attachments: KAFKA-979.patch


 Currently, for low volume topics time based rolling happens at the same time. 
 This causes a lot of IO on a typical cluster and creates back pressure. We 
 need to add a jitter to prevent them from happening at the same time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-979) Add jitter for time based rolling

2014-10-13 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-979:

Attachment: KAFKA-979.patch

 Add jitter for time based rolling
 -

 Key: KAFKA-979
 URL: https://issues.apache.org/jira/browse/KAFKA-979
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
  Labels: newbie
 Attachments: KAFKA-979.patch


 Currently, for low volume topics time based rolling happens at the same time. 
 This causes a lot of IO on a typical cluster and creates back pressure. We 
 need to add a jitter to prevent them from happening at the same time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 26663: Patch for KAFKA-979

2014-10-13 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26663/
---

Review request for kafka.


Bugs: KAFKA-979
https://issues.apache.org/jira/browse/KAFKA-979


Repository: kafka


Description
---

KAFKA-979 Add optional random jitter for time based log rolling.


Diffs
-

  core/src/main/scala/kafka/log/Log.scala 
a123cdc52f341a802b3e4bfeb29a6154332e5f73 
  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/main/scala/kafka/log/LogConfig.scala 
d2cc9e3d6b7a4fd24516d164eb3673e6ce052129 
  core/src/main/scala/kafka/log/LogSegment.scala 
7597d309f37a0b3756381f9500100ef763d466ba 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
7fcbc16da898623b03659c803e2a20c7d1bd1011 
  core/src/main/scala/kafka/server/KafkaServer.scala 
3e9e91f2b456bbdeb3055d571e18ffea8675b4bf 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
7b97e6a80753a770ac094e101c653193dec67e68 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
a0cbd3bbbeeabae12caa6b41aec31a8f5dfd034b 

Diff: https://reviews.apache.org/r/26663/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Commented] (KAFKA-979) Add jitter for time based rolling

2014-10-13 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170196#comment-14170196
 ] 

Ewen Cheslack-Postava commented on KAFKA-979:
-

Created reviewboard https://reviews.apache.org/r/26663/diff/
 against branch origin/trunk

 Add jitter for time based rolling
 -

 Key: KAFKA-979
 URL: https://issues.apache.org/jira/browse/KAFKA-979
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
  Labels: newbie
 Attachments: KAFKA-979.patch


 Currently, for low volume topics time based rolling happens at the same time. 
 This causes a lot of IO on a typical cluster and creates back pressure. We 
 need to add a jitter to prevent them from happening at the same time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1702) Messages silently Lost by producer

2014-10-13 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1702:
---
   Resolution: Fixed
Fix Version/s: 0.8.3
 Assignee: Alexis Midon  (was: Jun Rao)
   Status: Resolved  (was: Patch Available)

Got it. +1 for the patch. Committed to trunk.

 Messages silently Lost by producer
 --

 Key: KAFKA-1702
 URL: https://issues.apache.org/jira/browse/KAFKA-1702
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Alexis Midon
Assignee: Alexis Midon
 Fix For: 0.8.3

 Attachments: KAFKA-1702.0.patch


 Hello,
 we lost millions of messages because of this {{try/catch}} in  the producer 
 {{DefaultEventHandler}}:
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116
 If a Throwable is caught by this {{try/catch}}, the retry policy will have no 
 effect and all yet-to-be-sent messages are lost (the error will break the 
 loop over the broker list).
 This issue is very hard to detect because: the producer (async or sync) 
 cannot even catch the error, and *all* the metrics are updated as if 
 everything was fine.
 Only the abnormal drop in the producers network I/O, or the incoming message 
 rate on the brokers; or the alerting on errors in producer logs could have 
 revealed the issue. 
 This behavior was introduced by KAFKA-300. I can't see a good reason for it, 
 so here is a patch that will let the retry-policy do its job when such a 
 {{Throwable}} occurs.
 Thanks in advance for your help.
 Alexis
 ps: you might wonder how could this {{try/catch}} ever caught something? 
 {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. 
 Here are the details:
 We use Snappy compression. When the native snappy library is not installed on 
 the host, Snappy, during the initialization of class 
 {{org.xerial.snappy.Snappy}}  will [write a C 
 library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312]
  in the JVM temp directory {{java.io.tmpdir}}.
 In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an 
 instance reboot (thank you 
 [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp 
 directory was removed. The JVM was then running with a non-existing temp dir. 
 Snappy class would be impossible to initialize and the following message 
 would be silently logged:
 {code}
 ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: 
 Failed to send messages
 ! java.lang.NoClassDefFoundError: Could not initialize class 
 org.xerial.snappy.Snappy
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-13 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/
---

(Updated Oct. 13, 2014, 11:38 p.m.)


Review request for kafka.


Bugs: KAFKA-1647
https://issues.apache.org/jira/browse/KAFKA-1647


Repository: kafka


Description (updated)
---

Addressed Joel's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 

Diff: https://reviews.apache.org/r/26373/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-10-13 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1647:

Attachment: KAFKA-1647_2014-10-13_16:38:39.patch

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Joel Koshy
Assignee: Jiangjie Qin
Priority: Critical
  Labels: newbie++
 Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch


 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 p0: leader = b0, ISR = {b0, b1}
 p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make itself (b1) the leader for p1 (and create the local replica 
 object corresponding to p1). It will however abort the become follower 
 transition for p0 because the designated leader b0 is offline. So it will not 
 create the local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: including KAFKA-1555 in 0.8.2?

2014-10-13 Thread Jun Rao
Great. Pushed to 0.8.2 too.

Jun

On Fri, Oct 10, 2014 at 2:52 PM, Joel Koshy jjkosh...@gmail.com wrote:

 +1

 On Fri, Oct 10, 2014 at 04:53:45PM +, Sriram Subramanian wrote:
  +1
 
  On 10/10/14 9:39 AM, Gwen Shapira gshap...@cloudera.com wrote:
 
  +1 :)
  
  On Fri, Oct 10, 2014 at 9:34 AM, Joe Stein joe.st...@stealth.ly
 wrote:
   +1
On Oct 10, 2014 12:08 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
  
   +1.
  
   On Thu, Oct 9, 2014 at 6:41 PM, Jun Rao jun...@gmail.com wrote:
  
Hi, Everyone,
   
I just committed KAFKA-1555 (min.isr support) to trunk. I felt that
  it's
probably useful to include it in the 0.8.2 release. Any objections?
   
Thanks,
   
Jun
   
  
 




[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170256#comment-14170256
 ] 

Jun Rao commented on KAFKA-1555:


Committed to 0.8.2 too.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, 
 KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, 
 KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 26666: Patch for KAFKA-1653

2014-10-13 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/
---

Review request for kafka.


Bugs: KAFKA-1653
https://issues.apache.org/jira/browse/KAFKA-1653


Repository: kafka


Description
---

KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. This 
covers a few cases besides the one identified in the bug. Aside from a major 
refactoring to use Sets for broker/replica lists, sanitizing user input seems 
to be the best solution here. I chose to generate errors instead of just using 
toSet since a duplicate entry may indicate that a different broker id was 
accidentally omitted.


Diffs
-

  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
691d69a49a240f38883d2025afaec26fd61281b5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 
7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
  core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
d298e7e81acc7427c6cf4796b445966267ca54eb 

Diff: https://reviews.apache.org/r/2/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-13 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170264#comment-14170264
 ] 

Ewen Cheslack-Postava commented on KAFKA-1653:
--

Created reviewboard https://reviews.apache.org/r/2/diff/
 against branch origin/trunk

 Duplicate broker ids allowed in replica assignment
 --

 Key: KAFKA-1653
 URL: https://issues.apache.org/jira/browse/KAFKA-1653
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie
 Attachments: KAFKA-1653.patch


 The reassign partitions command and the controller do not ensure that all 
 replicas for a partition are on different brokers. For example, you could set 
 1,2,2 as the list of brokers for the replicas.
 kafka-topics.sh --describe --under-replicated will list these partitions as 
 under-replicated, but I can't see a reason why the controller should allow 
 this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation

2014-10-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170265#comment-14170265
 ] 

Guozhang Wang commented on KAFKA-1634:
--

I will work on this after KAFKA-1583.

 Improve semantics of timestamp in OffsetCommitRequests and update 
 documentation
 ---

 Key: KAFKA-1634
 URL: https://issues.apache.org/jira/browse/KAFKA-1634
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Blocker
 Fix For: 0.8.2


 From the mailing list -
 following up on this -- I think the online API docs for OffsetCommitRequest
 still incorrectly refer to client-side timestamps:
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
 Wasn't that removed and now always handled server-side now?  Would one of
 the devs mind updating the API spec wiki?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1704) Add PartitionConfig besides LogConfig

2014-10-13 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1704:


 Summary: Add PartitionConfig besides LogConfig
 Key: KAFKA-1704
 URL: https://issues.apache.org/jira/browse/KAFKA-1704
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0


Today we only two places to store configs: server configs which is used to 
store server side global configs, and log configs to store others. However, 
many topic / partition level configs would be better stored in a partition 
config such that they do not need to require accessing the underlying logs, for 
example:

1. uncleanLeaderElectionEnable
2. minInSyncReplicas
3. compact [? this is defined per-topic / partition but maybe ok to store as 
log configs]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1697) remove code related to ack1 on the broker

2014-10-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1697:
-
Description: 
We removed the ack1 support from the producer client in kafka-1555. We can 
completely remove the code in the broker that supports ack1.

Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable 
exception and let the client decide what to do.

  was:We removed the ack1 support from the producer client in kafka-1555. We 
can completely remove the code in the broker that supports ack1.


 remove code related to ack1 on the broker
 --

 Key: KAFKA-1697
 URL: https://issues.apache.org/jira/browse/KAFKA-1697
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Gwen Shapira
 Fix For: 0.8.3


 We removed the ack1 support from the producer client in kafka-1555. We can 
 completely remove the code in the broker that supports ack1.
 Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable 
 exception and let the client decide what to do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170272#comment-14170272
 ] 

Guozhang Wang commented on KAFKA-1555:
--

Created KAFKA-1704 for partition configs.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, 
 KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, 
 KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170275#comment-14170275
 ] 

Jun Rao commented on KAFKA-1481:


Vladimir,

Thanks for the patch. My suggestion is actually slightly different from what 
you did. Instead of using

kafka.consumer:type=ConsumerTopicMetrics,name=clientId=af_servers,AllTopics,BytesPerSec

I was suggesting

kafka.consumer:type=ConsumerTopicMetrics,clientId=af_servers,topic=AllTopics,name=BytesPerSec

This is probably the more standard mbean name.

We can do that by using the following method to create MetricName and pass in 
the mBeanName that we want.
public MetricName(String group, String type, String name, String scope, 
String mBeanName).

We also need to extend KafkaMetricsGroup by adding new helper functions that 
take a MetricName explicitly.
  def newMeter(name: MetricName, eventType: String, timeUnit: TimeUnit)

Also, your patch doesn't seem to apply to latest trunk.
git apply ~/Downloads/KAFKA-1481_2014-10-13_18-23-35.patch 
error: core/src/main/scala/kafka/common/ClientIdTopic.scala: No such file or 
directory





 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 26667: Patch for KAFKA-1698

2014-10-13 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26667/
---

Review request for kafka.


Bugs: KAFKA-1698
https://issues.apache.org/jira/browse/KAFKA-1698


Repository: kafka


Description
---

KAFKA-1698 Validate parsed ConfigDef values in addition to the default values.


Diffs
-

  clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
227309e8c62f9fc435722f28f2deff4a48e30853 
  clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 
09a82feeb7cae95209e54d3554224915a1498ebd 

Diff: https://reviews.apache.org/r/26667/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-1698) Validator.ensureValid() only validates default config value

2014-10-13 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1698:
-
Assignee: Ewen Cheslack-Postava
  Status: Patch Available  (was: Open)

 Validator.ensureValid() only validates default config value
 ---

 Key: KAFKA-1698
 URL: https://issues.apache.org/jira/browse/KAFKA-1698
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao
Assignee: Ewen Cheslack-Postava
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1698.patch


 We should use it to validate the actual configured value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1698) Validator.ensureValid() only validates default config value

2014-10-13 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1698:
-
Attachment: KAFKA-1698.patch

 Validator.ensureValid() only validates default config value
 ---

 Key: KAFKA-1698
 URL: https://issues.apache.org/jira/browse/KAFKA-1698
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1698.patch


 We should use it to validate the actual configured value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1698) Validator.ensureValid() only validates default config value

2014-10-13 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170280#comment-14170280
 ] 

Ewen Cheslack-Postava commented on KAFKA-1698:
--

Created reviewboard https://reviews.apache.org/r/26667/diff/
 against branch origin/trunk

 Validator.ensureValid() only validates default config value
 ---

 Key: KAFKA-1698
 URL: https://issues.apache.org/jira/browse/KAFKA-1698
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1698.patch


 We should use it to validate the actual configured value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-13 Thread Bhavesh Mistry
Hi Kafka Dev Team,

Let me know if new Producer is supporting sending data to single partition
(without blocking).  We have use-case for Aggregating Events.


Thanks,

Bhavesh

On Mon, Oct 13, 2014 at 10:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com
 wrote:

 Hi Kafka Dev Team,

 When I run the test to send message to single partition for 3 minutes or
 so on, I have encounter deadlock (please see the screen attached) and
 thread contention from YourKit profiling.

 Use Case:

 1)  Aggregating messages into same partition for metric counting.
 2)  Replicate Old Producer behavior for sticking to partition for 3
 minutes.


 Here is output:

 Frozen threads found (potential deadlock)

 It seems that the following threads have not changed their stack for more
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.

 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
 byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run()
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run()
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744



 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
 byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run()
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run()
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744



 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
 byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run()
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run()
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744







Re: Review Request 24676: Rebase KAFKA-1583

2014-10-13 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Oct. 14, 2014, 2:42 a.m.)


Review request for kafka.


Summary (updated)
-

Rebase KAFKA-1583


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor 
changes


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 
a123cdc52f341a802b3e4bfeb29a6154332e5f73 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
67f2833804cb15976680e42b9dc49e275c89d266 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



[jira] [Commented] (KAFKA-1583) Kafka API Refactoring

2014-10-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170405#comment-14170405
 ] 

Guozhang Wang commented on KAFKA-1583:
--

Updated reviewboard https://reviews.apache.org/r/24676/diff/
 against branch origin/trunk

 Kafka API Refactoring
 -

 Key: KAFKA-1583
 URL: https://issues.apache.org/jira/browse/KAFKA-1583
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
 KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
 KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
 KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
 KAFKA-1583_2014-10-13_19:41:58.patch


 This is the next step of KAFKA-1430. Details can be found at this page:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1583) Kafka API Refactoring

2014-10-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1583:
-
Attachment: KAFKA-1583_2014-10-13_19:41:58.patch

 Kafka API Refactoring
 -

 Key: KAFKA-1583
 URL: https://issues.apache.org/jira/browse/KAFKA-1583
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
 KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
 KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
 KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
 KAFKA-1583_2014-10-13_19:41:58.patch


 This is the next step of KAFKA-1430. Details can be found at this page:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Message Metadata

2014-10-13 Thread Guozhang Wang
Hi Jay,

Thanks for the comments. Replied inline.

Guozhang

On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps jay.kr...@gmail.com wrote:

 I need to take more time to think about this. Here are a few off-the-cuff
 remarks:

 - To date we have tried really, really hard to keep the data model for
 message simple since after all you can always add whatever you like inside
 the message body.

 - For system tags, why not just make these fields first class fields in
 message? The purpose of a system tag is presumably that Why have a bunch of
 key-value pairs versus first-class fields?


Yes, we can alternatively make system tags as first class fields in the
message header to make the format / processing logic simpler.

The main reasons I put them as systems tags are 1) when I think about these
possible system tags, some of them are for all types of messages (e.g.
timestamps), but some of them may be for a specific type of message
(compressed, control message) and for those not all of them are necessarily
required all the time, hence making them as compact tags may save us some
space when not all of them are available; 2) with tags we do not need to
bump up the protocol version every time we make a change to it, which
includes keeping the logic to handle all versions on the broker until the
old ones are officially discarded; instead, the broker can just ignore a
tag if its id is not recognizable since the client is on a newer version,
or use some default value / throw exception if a required tag is missing
since the client is on an older version.



 - You don't necessarily need application-level tags explicitly represented
 in the message format for efficiency. The application can define their own
 header (e.g. their message could be a size delimited header followed by a
 size delimited body). But actually if you use Avro you don't even need this
 I don't think. Avro has the ability to just deserialize the header fields
 in your message. Avro has a notion of reader and writer schemas. The writer
 schema is whatever the message was written with. If the reader schema is
 just the header, avro will skip any fields it doesn't need and just
 deserialize the fields it does need. This is actually a much more usable
 and flexible way to define a header since you get all the types avro allows
 instead of just bytes.


I agree that we can use a reader schema to just read out the header without
de-serializing the full message, and probably for compressed message we can
add an Avro / etc header for the compressed wrapper message also, but that
would enforce these applications (MM, auditor, clients) to be schema-aware,
which would usually require the people who manage this data pipeline also
manage the schemas, whereas ideally Kafka itself should just consider
bytes-in and bytes-out (and maybe a little bit more, like timestamps). The
purpose here is to not introduce an extra dependency while at the same time
allow applications to not fully de-serialize / de-compress the message in
order to do some simple processing based on metadata only.



 - We will need to think carefully about what to do with timestamps if we
 end up including them. There are actually several timestamps
   - The time the producer created the message
   - The time the leader received the message
   - The time the current broker received the message
 The producer timestamps won't be at all increasing. The leader timestamp
 will be mostly increasing except when the clock changes or leadership
 moves. This somewhat complicates the use of these timestamps, though. From
 the point of view of the producer the only time that matters is the time
 the message was created. However since the producer sets this it can be
 arbitrarily bad (remember all the ntp issues and 1970 timestamps we would
 get). Say that the heuristic was to use the timestamp of the first message
 in a file for retention, the problem would be that the timestamps for the
 segments need not even be sequential and a single bad producer could send
 data with time in the distant past or future causing data to be deleted or
 retained forever. Using the broker timestamp at write time is better,
 though obvious that would be overwritten when data is mirrored between
 clusters (the mirror would then have a different time--and if the mirroring
 ever stopped that gap could be large). One approach would be to use the
 client timestamp but have the broker overwrite it if it is too bad (e.g.
 off by more than a minute, say).


We would need the reception timestamp (i.e. the third one) for log
cleaning, and as for the first / second ones, I originally put them as app
tags since they are likely to be used not by the brokers itself (e.g.
auditor, etc).



 -Jay

 On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy jjkosh...@gmail.com wrote:

  Thanks Guozhang! This is an excellent write-up and the approach nicely
  consolidates a number of long-standing issues. It would be great if
  everyone can review this carefully 

[jira] [Updated] (KAFKA-1703) The bat script failed to start on windows

2014-10-13 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1703:
-
Labels: newbie  (was: )

 The bat script failed to start on windows
 -

 Key: KAFKA-1703
 URL: https://issues.apache.org/jira/browse/KAFKA-1703
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.1.1
Reporter: ChengRen
  Labels: newbie

 The bat script in bin\windows can not start zookeeper and kafka correctly 
 (where my os is just installed and only jdk ready). I modified the 
 kafka-run-class.bat and add jars in libs folder to classpath.
 for %%i in (%BASE_DIR%\core\lib\*.jar) do (
   call :concat %%i
 )
  added  begin
 for %%i in (%BASE_DIR%\..\libs\*.jar) do (
   call :concat %%i
 )
  added  end
 for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do (
   call :concat %%i
 )
 Now it runs correctly.
 Under bin\windows:
 zookeeper-server-start.bat ..\..\config\zookeeper.properties
kafka-server-start.bat ..\..\config\kafka.properties



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26663: Patch for KAFKA-979

2014-10-13 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26663/#review56490
---


Looks good. Just one minor comment. Could you get rid of this build warning?

/Users/nnarkhed/Projects/kafka/core/src/main/scala/kafka/log/LogConfig.scala:103:
 object Math is deprecated: use the scala.math package object instead.
(Example package object usage: scala.math.Pi )
if (segmentJitterMs == 0) 0 else scala.util.Random.nextLong() % 
Math.min(segmentJitterMs, segmentMs)

- Neha Narkhede


On Oct. 13, 2014, 11:16 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26663/
 ---
 
 (Updated Oct. 13, 2014, 11:16 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-979
 https://issues.apache.org/jira/browse/KAFKA-979
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-979 Add optional random jitter for time based log rolling.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/Log.scala 
 a123cdc52f341a802b3e4bfeb29a6154332e5f73 
   core/src/main/scala/kafka/log/LogCleaner.scala 
 c20de4ad4734c0bd83c5954fdb29464a27b91dff 
   core/src/main/scala/kafka/log/LogConfig.scala 
 d2cc9e3d6b7a4fd24516d164eb3673e6ce052129 
   core/src/main/scala/kafka/log/LogSegment.scala 
 7597d309f37a0b3756381f9500100ef763d466ba 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 7fcbc16da898623b03659c803e2a20c7d1bd1011 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 3e9e91f2b456bbdeb3055d571e18ffea8675b4bf 
   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
 7b97e6a80753a770ac094e101c653193dec67e68 
   core/src/test/scala/unit/kafka/log/LogTest.scala 
 a0cbd3bbbeeabae12caa6b41aec31a8f5dfd034b 
 
 Diff: https://reviews.apache.org/r/26663/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Updated] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation

2014-10-13 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1634:
-
Assignee: Guozhang Wang  (was: Joel Koshy)

 Improve semantics of timestamp in OffsetCommitRequests and update 
 documentation
 ---

 Key: KAFKA-1634
 URL: https://issues.apache.org/jira/browse/KAFKA-1634
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Guozhang Wang
Priority: Blocker
 Fix For: 0.8.2


 From the mailing list -
 following up on this -- I think the online API docs for OffsetCommitRequest
 still incorrectly refer to client-side timestamps:
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
 Wasn't that removed and now always handled server-side now?  Would one of
 the devs mind updating the API spec wiki?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)