[jira] [Updated] (KAFKA-965) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk

2013-07-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-965:
--

Attachment: kafka-965.patch

Attach a patch. The following files have to be merged manually.

# Unmerged paths:
#   (use git add/rm file... as appropriate to mark resolution)
#
#   both modified:  core/src/main/scala/kafka/admin/AdminUtils.scala
#   both modified:  core/src/main/scala/kafka/api/RequestKeys.scala
#   both modified:  core/src/main/scala/kafka/log/Log.scala
#   both modified:  core/src/main/scala/kafka/log/LogSegment.scala
#   deleted by us:  
core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
#   both modified:  core/src/main/scala/kafka/server/KafkaApis.scala
#   both modified:  core/src/main/scala/kafka/server/KafkaConfig.scala
#   both modified:  core/src/main/scala/kafka/server/KafkaServer.scala
#   both modified:  
core/src/main/scala/kafka/server/ReplicaManager.scala
#   both modified:  core/src/main/scala/kafka/utils/ZkUtils.scala
#   both modified:  core/src/test/scala/unit/kafka/admin/AdminTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/producer/ProducerTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
#
# Changed but not updated:
#   (use git add file... to update what will be committed)
#   (use git checkout -- file... to discard changes in working directory)
#
#   modified:   core/src/main/scala/kafka/server/OffsetCheckpoint.scala


 merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk
 

 Key: KAFKA-965
 URL: https://issues.apache.org/jira/browse/KAFKA-965
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 0.8.1
Reporter: Jun Rao
Assignee: Jun Rao
 Attachments: kafka-965.patch




--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-965) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk

2013-07-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-965:
--

Status: Patch Available  (was: In Progress)

 merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk
 

 Key: KAFKA-965
 URL: https://issues.apache.org/jira/browse/KAFKA-965
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 0.8.1
Reporter: Jun Rao
Assignee: Jun Rao
 Attachments: kafka-965.patch




--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: beta2 = Maven Repository Signed and Staged

2013-07-09 Thread Joe Stein
I agree it is very very low touch since it is only a change when building
for publishing and does not affect the jar.  I don't have objections and
unless anyone else does have objections then I will publish and see maybe
we get more errors/issues or 0.8.0-beta1 will be available from maven.  I
should be able to-do this later tonight or in the morning unless someone
has an issue with proceeding.


On Sun, Jul 7, 2013 at 5:40 PM, Jay Kreps jay.kr...@gmail.com wrote:

 That's awesome!

 Personally I think it is okay if there is a slight cosmetic difference as
 long as we fix it before the proper 0.8 release.

 -Jay


 On Sun, Jul 7, 2013 at 6:55 AM, Joe Stein crypt...@gmail.com wrote:

  So, I think we have everything now for a successful publishing to maven
  repo for 0.8.0-betaX Kafka build on Scala 2.8.0,  2.8.2,  2.9.1,  2.9.2
 
  Since I had to make a slight code change
  https://issues.apache.org/jira/browse/KAFKA-963 I am not sure if it is
  proper to officially publish the artifacts since it was not what we voted
  upon technically speaking.
 
  I don't know for sure if when I hit close to the staging release to
  promote it to public release if any other errors will come up through the
  process.
 
  My thinking is that we should roll a beta2 release and give the publish
 to
  public release another go.
 
  Thoughts?
 
  /*
  Joe Stein
  http://www.linkedin.com/in/charmalloc
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  */
 




-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
*/


[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message

2013-07-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13703755#comment-13703755
 ] 

Joel Koshy commented on KAFKA-966:
--

One way to accomplish this is to turn off autocommit and checkpoint offsets 
only after a message (or batch of messages) have been written to the DB.

One caveat though is that rebalances (e.g., if a new consumer instance shows 
up) will result in offsets being committed so there would be an issue if the DB 
is unavailable and a rebalance occurs simultaneously and there are unprocessed 
messages that have already been pulled out of the iterator.


 Allow high level consumer to 'nak' a message and force Kafka to close the 
 KafkaStream without losing that message
 -

 Key: KAFKA-966
 URL: https://issues.apache.org/jira/browse/KAFKA-966
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 0.8
Reporter: Chris Curtin
Assignee: Neha Narkhede
Priority: Minor

 Enhancement request.
 The high level consumer is very close to handling a lot of situations a 
 'typical' client would need. Except for when the message received from Kafka 
 is valid, but the business logic that wants to consume it has a problem.
 For example if I want to write the value to a MongoDB or Cassandra database 
 and the database is not available. I won't know until I go to do the write 
 that the database isn't available, but by then it is too late to NOT read the 
 message from Kafka. Thus if I call shutdown() to stop reading, that message 
 is lost since the offset Kafka writes to ZooKeeper is the next offset.
 Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the 
 next offset to read for this partition to this message when I start up again. 
 And if there are any messages in the BlockingQueue for other partitions, find 
 the lowest # and use it for that partitions offset since I haven't consumed 
 them yet.
 Thus I can cleanly shutdown my processing, resolve whatever the issue is and 
 restart the process.
 Another idea might be to allow a 'peek' into the next message and if I 
 succeed in writing to the database call 'next' to remove it from the queue. 
 I understand this won't deal with a 'kill -9' or hard failure of the JVM 
 leading to the latest offsets not being written to ZooKeeper but it addresses 
 a likely common scenario for consumers. Nor will it add true transactional 
 support since the ZK update could fail.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-07-09 Thread John Fung (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Fung updated KAFKA-915:


Status: Open  (was: Patch Available)

 System Test - Mirror Maker testcase_5001 failed
 ---

 Key: KAFKA-915
 URL: https://issues.apache.org/jira/browse/KAFKA-915
 Project: Kafka
  Issue Type: Bug
Reporter: John Fung
Assignee: Joel Koshy
Priority: Critical
  Labels: kafka-0.8, replication-testing
 Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz


 This case passes if brokers are set to partition = 1, replicas = 1
 It fails if brokers are set to partition = 5, replicas = 3 (consistently 
 reproducible)
 This test case is set up as shown below.
 1. Start 2 ZK as a cluster in Source
 2. Start 2 ZK as a cluster in Target
 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
 5. Start 1 MM
 6. Start ProducerPerformance to send some data
 7. After Producer is done, start ConsoleConsumer to consume data
 8. Stop all processes and validate if there is any data loss.
 9. No failure is introduced to any process in this test
 Attached a tar file which contains the logs and system test output for both 
 cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-07-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13703827#comment-13703827
 ] 

John Fung commented on KAFKA-915:
-

Hi Joel,

After apply kafka-915-v1.patch (which is to create topic manually before 
starting mirror maker), testcase_5001 passes. However, testcase_5003  
testcase_5005 are failing due to data loss.

Thanks,
John

 System Test - Mirror Maker testcase_5001 failed
 ---

 Key: KAFKA-915
 URL: https://issues.apache.org/jira/browse/KAFKA-915
 Project: Kafka
  Issue Type: Bug
Reporter: John Fung
Assignee: Joel Koshy
Priority: Critical
  Labels: kafka-0.8, replication-testing
 Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz


 This case passes if brokers are set to partition = 1, replicas = 1
 It fails if brokers are set to partition = 5, replicas = 3 (consistently 
 reproducible)
 This test case is set up as shown below.
 1. Start 2 ZK as a cluster in Source
 2. Start 2 ZK as a cluster in Target
 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
 5. Start 1 MM
 6. Start ProducerPerformance to send some data
 7. After Producer is done, start ConsoleConsumer to consume data
 8. Stop all processes and validate if there is any data loss.
 9. No failure is introduced to any process in this test
 Attached a tar file which contains the logs and system test output for both 
 cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: having problem with 0.8 gzip compression

2013-07-09 Thread Scott Wang
Another piece of information, the snappy compression also does not work.

Thanks,
Scott


On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang 
scott.w...@rumbleentertainment.com wrote:

 I just try it and it still not showing up, thanks for looking into this.

 Thanks,
 Scott


 On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote:

 Could you try starting the consumer first (and enable gzip in the
 producer)?

 Thanks,

 Jun


 On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang 
 scott.w...@rumbleentertainment.com wrote:

  No, I did not start the consumer before the producer.  I actually
 started
  the producer first and nothing showed up in the consumer unless I
 commented
  out this line -- props.put(compression.codec, gzip).If I
 commented
  out the compression codec, everything just works.
 
 
  On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote:
 
   Did you start the consumer before the producer? Be default, the
 consumer
   gets only the new data?
  
   Thanks,
  
   Jun
  
  
   On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang 
   scott.w...@rumbleentertainment.com wrote:
  
I am testing with Kafka 0.8 beta and having problem of receiving
  message
   in
consumer.  There is no error so does anyone have any insights.
  When I
commented out the compression.code everything works fine.
   
My producer:
public class TestKafka08Prod {
   
public static void main(String [] args) {
   
ProducerInteger, String producer = null;
try {
Properties props = new Properties();
props.put(metadata.broker.list, localhost:9092);
props.put(serializer.class,
kafka.serializer.StringEncoder);
props.put(producer.type, sync);
props.put(request.required.acks,1);
props.put(compression.codec, gzip);
ProducerConfig config = new ProducerConfig(props);
producer = new ProducerInteger, String(config);
int j=0;
for(int i=0; i10; i++) {
KeyedMessageInteger, String data = new
KeyedMessageInteger, String(test-topic, test-message: +i+
+System.currentTimeMillis());
producer.send(data);
   
}
   
} catch (Exception e) {
System.out.println(Error happened: );
e.printStackTrace();
} finally {
if(null != null) {
producer.close();
}
   
System.out.println(Ened of Sending);
}
   
System.exit(0);
}
}
   
   
My consumer:
   
public class TestKafka08Consumer {
public static void main(String [] args) throws
  UnknownHostException,
SocketException {
   
Properties props = new Properties();
props.put(zookeeper.connect, localhost:2181/kafka_0_8);
props.put(group.id, test08ConsumerId);
props.put(zk.sessiontimeout.ms, 4000);
props.put(zk.synctime.ms, 2000);
props.put(autocommit.interval.ms, 1000);
   
ConsumerConfig consumerConfig = new ConsumerConfig(props);
   
ConsumerConnector consumerConnector =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
   
String topic = test-topic;
MapString, Integer topicCountMap = new HashMapString,
Integer();
topicCountMap.put(topic, new Integer(1));
MapString, ListKafkaStreambyte[], byte[] consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStreambyte[], byte[] stream =
 consumerMap.get(topic).get(0);
   
ConsumerIteratorbyte[], byte[] it = stream.iterator();
   
int counter=0;
while(it.hasNext()) {
try {
String fromPlatform = new
 String(it.next().message());
System.out.println(The messages: +fromPlatform);
} catch(Exception e) {
e.printStackTrace();
}
}
System.out.println(SystemOut);
}
}
   
   
Thanks
   
  
 





[jira] [Created] (KAFKA-967) Use key range in ProducerPerformance

2013-07-09 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-967:
---

 Summary: Use key range in ProducerPerformance
 Key: KAFKA-967
 URL: https://issues.apache.org/jira/browse/KAFKA-967
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang


Currently in ProducerPerformance, the key of the message is set to MessageID. 
It would better to set it to a specific key within a key range (Integer type) 
so that we can test the semantic partitioning case. This is related to 
KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-967) Use key range in ProducerPerformance

2013-07-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-967:


Attachment: KAFKA-967.v1.patch

1. Add messageKeyRange to ProducerPerfConfig

2. In generateProducerData, select the key in a round robin manner within the 
key range.

3. Set the key in generateMessageWithSeqId and add the key value to the 
payload. 

 Use key range in ProducerPerformance
 

 Key: KAFKA-967
 URL: https://issues.apache.org/jira/browse/KAFKA-967
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Attachments: KAFKA-967.v1.patch


 Currently in ProducerPerformance, the key of the message is set to MessageID. 
 It would better to set it to a specific key within a key range (Integer type) 
 so that we can test the semantic partitioning case. This is related to 
 KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Issue Comment Deleted] (KAFKA-967) Use key range in ProducerPerformance

2013-07-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-967:


Comment: was deleted

(was: 1. Add messageKeyRange to ProducerPerfConfig

2. In generateProducerData, select the key in a round robin manner within the 
key range.

3. Set the key in generateMessageWithSeqId and add the key value to the 
payload.)

 Use key range in ProducerPerformance
 

 Key: KAFKA-967
 URL: https://issues.apache.org/jira/browse/KAFKA-967
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Attachments: KAFKA-967.v1.patch


 Currently in ProducerPerformance, the key of the message is set to MessageID. 
 It would better to set it to a specific key within a key range (Integer type) 
 so that we can test the semantic partitioning case. This is related to 
 KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-967) Use key range in ProducerPerformance

2013-07-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-967:


Status: Patch Available  (was: Open)

1. Add messageKeyRange to ProducerPerfConfig

2. In generateProducerData, select the key in a round robin manner within the 
key range.

3. Set the key in generateMessageWithSeqId and add the key value to the payload.

 Use key range in ProducerPerformance
 

 Key: KAFKA-967
 URL: https://issues.apache.org/jira/browse/KAFKA-967
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Attachments: KAFKA-967.v1.patch


 Currently in ProducerPerformance, the key of the message is set to MessageID. 
 It would better to set it to a specific key within a key range (Integer type) 
 so that we can test the semantic partitioning case. This is related to 
 KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira