Also, I would recommend rebasing the 0.8 branch to the latest trunk now. It has been quite a while since the last time I did a rebase.
Joe, would you mind helping with that ? Thanks, Neha On Tue, Mar 6, 2012 at 9:15 AM, Jun Rao <jun...@gmail.com> wrote: > Joe, > > 0.8 now has compilation error after this check in. It seems that you forgot > to check in a new file. See my comment in kafka-240. Could you fix this? > > Thanks, > > Jun > > On Fri, Mar 2, 2012 at 9:46 PM, <joest...@apache.org> wrote: > >> Author: joestein >> Date: Sat Mar 3 05:46:43 2012 >> New Revision: 1296577 >> >> URL: http://svn.apache.org/viewvc?rev=1296577&view=rev >> Log: >> KAFKA-240 ProducerRequest wire format protocol update and related changes >> >> Removed: >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiProducerRequest.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala >> Modified: >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala >> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala >> >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala >> >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala >> >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala >> >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala >> Sat Mar 3 05:46:43 2012 >> @@ -39,6 +39,15 @@ object PartitionData { >> >> case class PartitionData(partition: Int, error: Int = >> ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) { >> val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() >> + >> + def this(partition: Int, messages: MessageSet) = this(partition, >> ErrorMapping.NoError, 0L, messages) >> + >> + def getTranslatedPartition(topic: String, randomSelector: String => >> Int): Int = { >> + if (partition == ProducerRequest.RandomPartition) >> + return randomSelector(topic) >> + else >> + return partition >> + } >> } >> >> object TopicData { >> @@ -73,6 +82,15 @@ object TopicData { >> >> case class TopicData(topic: String, partitionData: Array[PartitionData]) { >> val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + >> _.sizeInBytes) >> + >> + override def equals(other: Any): Boolean = { >> + other match { >> + case that: TopicData => >> + ( topic == that.topic && >> + partitionData.toSeq == that.partitionData.toSeq ) >> + case _ => false >> + } >> + } >> } >> >> object FetchResponse { >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala >> Sat Mar 3 05:46:43 2012 >> @@ -24,60 +24,108 @@ import kafka.utils._ >> >> object ProducerRequest { >> val RandomPartition = -1 >> - >> + val versionId: Short = 0 >> + >> def readFrom(buffer: ByteBuffer): ProducerRequest = { >> - val topic = Utils.readShortString(buffer, "UTF-8") >> - val partition = buffer.getInt >> - val messageSetSize = buffer.getInt >> - val messageSetBuffer = buffer.slice() >> - messageSetBuffer.limit(messageSetSize) >> - buffer.position(buffer.position + messageSetSize) >> - new ProducerRequest(topic, partition, new >> ByteBufferMessageSet(messageSetBuffer)) >> + val versionId: Short = buffer.getShort >> + val correlationId: Int = buffer.getInt >> + val clientId: String = Utils.readShortString(buffer, "UTF-8") >> + val requiredAcks: Short = buffer.getShort >> + val ackTimeout: Int = buffer.getInt >> + //build the topic structure >> + val topicCount = buffer.getInt >> + val data = new Array[TopicData](topicCount) >> + for(i <- 0 until topicCount) { >> + val topic = Utils.readShortString(buffer, "UTF-8") >> + >> + val partitionCount = buffer.getInt >> + //build the partition structure within this topic >> + val partitionData = new Array[PartitionData](partitionCount) >> + for (j <- 0 until partitionCount) { >> + val partition = buffer.getInt >> + val messageSetSize = buffer.getInt >> + val messageSetBuffer = new Array[Byte](messageSetSize) >> + buffer.get(messageSetBuffer,0,messageSetSize) >> + partitionData(j) = new PartitionData(partition,new >> ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) >> + } >> + data(i) = new TopicData(topic,partitionData) >> + } >> + new ProducerRequest(versionId, correlationId, clientId, requiredAcks, >> ackTimeout, data) >> } >> } >> >> -class ProducerRequest(val topic: String, >> - val partition: Int, >> - val messages: ByteBufferMessageSet) extends >> Request(RequestKeys.Produce) { >> +case class ProducerRequest(val versionId: Short, val correlationId: Int, >> + val clientId: String, >> + val requiredAcks: Short, >> + val ackTimeout: Int, >> + val data: Array[TopicData]) extends >> Request(RequestKeys.Produce) { >> + >> + def this(correlationId: Int, clientId: String, requiredAcks: Short, >> ackTimeout: Int, data: Array[TopicData]) = this(ProducerRequest.versionId, >> correlationId, clientId, requiredAcks, ackTimeout, data) >> >> def writeTo(buffer: ByteBuffer) { >> - Utils.writeShortString(buffer, topic) >> - buffer.putInt(partition) >> - buffer.putInt(messages.serialized.limit) >> - buffer.put(messages.serialized) >> - messages.serialized.rewind >> + buffer.putShort(versionId) >> + buffer.putInt(correlationId) >> + Utils.writeShortString(buffer, clientId, "UTF-8") >> + buffer.putShort(requiredAcks) >> + buffer.putInt(ackTimeout) >> + //save the topic structure >> + buffer.putInt(data.size) //the number of topics >> + data.foreach(d =>{ >> + Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic >> + buffer.putInt(d.partitionData.size) //the number of partitions >> + d.partitionData.foreach(p => { >> + buffer.putInt(p.partition) >> + buffer.putInt(p.messages.getSerialized().limit) >> + buffer.put(p.messages.getSerialized()) >> + p.messages.getSerialized().rewind >> + }) >> + }) >> } >> - >> - def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + >> messages.sizeInBytes.asInstanceOf[Int] >> >> - def getTranslatedPartition(randomSelector: String => Int): Int = { >> - if (partition == ProducerRequest.RandomPartition) >> - return randomSelector(topic) >> - else >> - return partition >> + def sizeInBytes(): Int = { >> + var size = 0 >> + //size, request_type_id, version_id, correlation_id, client_id, >> required_acks, ack_timeout, data.size >> + size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4; >> + data.foreach(d =>{ >> + size += 2 + d.topic.length + 4 >> + d.partitionData.foreach(p => { >> + size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] >> + }) >> + }) >> + size >> } >> >> override def toString: String = { >> val builder = new StringBuilder() >> builder.append("ProducerRequest(") >> - builder.append(topic + ",") >> - builder.append(partition + ",") >> - builder.append(messages.sizeInBytes) >> + builder.append(versionId + ",") >> + builder.append(correlationId + ",") >> + builder.append(clientId + ",") >> + builder.append(requiredAcks + ",") >> + builder.append(ackTimeout) >> + data.foreach(d =>{ >> + builder.append(":[" + d.topic) >> + d.partitionData.foreach(p => { >> + builder.append(":[") >> + builder.append(p.partition + ",") >> + builder.append(p.messages.sizeInBytes) >> + builder.append("]") >> + }) >> + builder.append("]") >> + }) >> builder.append(")") >> builder.toString >> } >> >> override def equals(other: Any): Boolean = { >> - other match { >> + other match { >> case that: ProducerRequest => >> - (that canEqual this) && topic == that.topic && partition == >> that.partition && >> - messages.equals(that.messages) >> + ( correlationId == that.correlationId && >> + clientId == that.clientId && >> + requiredAcks == that.requiredAcks && >> + ackTimeout == that.ackTimeout && >> + data.toSeq == that.data.toSeq) >> case _ => false >> } >> } >> - >> - def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] >> - >> - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + >> messages.hashCode >> - >> -} >> +} >> \ No newline at end of file >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala >> Sat Mar 3 05:46:43 2012 >> @@ -22,7 +22,7 @@ import kafka.api.TopicData >> >> class FetchResponse( val versionId: Short, >> val correlationId: Int, >> - val data: Array[TopicData] ) { >> + private val data: Array[TopicData] ) { >> >> private val underlying = new kafka.api.FetchResponse(versionId, >> correlationId, data) >> >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala >> Sat Mar 3 05:46:43 2012 >> @@ -17,36 +17,29 @@ >> package kafka.javaapi >> >> import kafka.network.Request >> -import kafka.api.RequestKeys >> +import kafka.api.{RequestKeys, TopicData} >> import java.nio.ByteBuffer >> >> -class ProducerRequest(val topic: String, >> - val partition: Int, >> - val messages: >> kafka.javaapi.message.ByteBufferMessageSet) extends >> Request(RequestKeys.Produce) { >> +class ProducerRequest(val correlationId: Int, >> + val clientId: String, >> + val requiredAcks: Short, >> + val ackTimeout: Int, >> + val data: Array[TopicData]) extends >> Request(RequestKeys.Produce) { >> + >> import Implicits._ >> - private val underlying = new kafka.api.ProducerRequest(topic, >> partition, messages) >> + val underlying = new kafka.api.ProducerRequest(correlationId, clientId, >> requiredAcks, ackTimeout, data) >> >> def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } >> >> def sizeInBytes(): Int = underlying.sizeInBytes >> >> - def getTranslatedPartition(randomSelector: String => Int): Int = >> - underlying.getTranslatedPartition(randomSelector) >> - >> override def toString: String = >> underlying.toString >> >> - override def equals(other: Any): Boolean = { >> - other match { >> - case that: ProducerRequest => >> - (that canEqual this) && topic == that.topic && partition == >> that.partition && >> - messages.equals(that.messages) >> - case _ => false >> - } >> - } >> + override def equals(other: Any): Boolean = underlying.equals(other) >> >> def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] >> >> - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + >> messages.hashCode >> + override def hashCode: Int = underlying.hashCode >> >> -} >> +} >> \ No newline at end of file >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala >> Sat Mar 3 05:46:43 2012 >> @@ -39,7 +39,7 @@ class ByteBufferMessageSet(private val b >> >> def validBytes: Long = underlying.validBytes >> >> - def serialized():ByteBuffer = underlying.serialized >> + def serialized():ByteBuffer = underlying.getSerialized() >> >> def getInitialOffset = initialOffset >> >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala >> Sat Mar 3 05:46:43 2012 >> @@ -18,6 +18,8 @@ package kafka.javaapi.producer >> >> import kafka.producer.SyncProducerConfig >> import kafka.javaapi.message.ByteBufferMessageSet >> +import kafka.javaapi.ProducerRequest >> +import kafka.api.{PartitionData, TopicData} >> >> class SyncProducer(syncProducer: kafka.producer.SyncProducer) { >> >> @@ -25,21 +27,17 @@ class SyncProducer(syncProducer: kafka.p >> >> val underlying = syncProducer >> >> - def send(topic: String, partition: Int, messages: ByteBufferMessageSet) >> { >> - import kafka.javaapi.Implicits._ >> - underlying.send(topic, partition, messages) >> + def send(producerRequest: kafka.javaapi.ProducerRequest) { >> + underlying.send(producerRequest.underlying) >> } >> >> - def send(topic: String, messages: ByteBufferMessageSet): Unit = >> send(topic, >> - >> kafka.api.ProducerRequest.RandomPartition, >> - >> messages) >> - >> - def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { >> - import kafka.javaapi.Implicits._ >> - val produceRequests = new >> Array[kafka.api.ProducerRequest](produces.length) >> - for(i <- 0 until produces.length) >> - produceRequests(i) = new >> kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, >> produces(i).messages) >> - underlying.multiSend(produceRequests) >> + def send(topic: String, messages: ByteBufferMessageSet): Unit = { >> + var data = new Array[TopicData](1) >> + var partition_data = new Array[PartitionData](1) >> + partition_data(0) = new PartitionData(-1,messages.underlying) >> + data(0) = new TopicData(topic,partition_data) >> + val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, >> data) >> + underlying.send(producerRequest) >> } >> >> def close() { >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala >> Sat Mar 3 05:46:43 2012 >> @@ -53,7 +53,7 @@ class ByteBufferMessageSet(private val b >> >> def getErrorCode = errorCode >> >> - def serialized(): ByteBuffer = buffer >> + def getSerialized(): ByteBuffer = buffer >> >> def validBytes: Long = shallowValidBytes >> >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala >> Sat Mar 3 05:46:43 2012 >> @@ -40,6 +40,8 @@ class FileMessageSet private[kafka](priv >> private val setSize = new AtomicLong() >> private val setHighWaterMark = new AtomicLong() >> >> + def getSerialized(): ByteBuffer = throw new >> java.lang.UnsupportedOperationException() >> + >> if(mutable) { >> if(limit < Long.MaxValue || offset > 0) >> throw new IllegalArgumentException("Attempt to open a mutable >> message set with a view or offset, which is not allowed.") >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala >> Sat Mar 3 05:46:43 2012 >> @@ -111,4 +111,9 @@ abstract class MessageSet extends Iterab >> throw new InvalidMessageException >> } >> >> + /** >> + * Used to allow children to have serialization on implementation >> + */ >> + def getSerialized(): ByteBuffer >> + >> } >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala >> Sat Mar 3 05:46:43 2012 >> @@ -51,29 +51,10 @@ class SyncProducer(val config: SyncProdu >> if (logger.isTraceEnabled) { >> trace("verifying sendbuffer of size " + buffer.limit) >> val requestTypeId = buffer.getShort() >> - if (requestTypeId == RequestKeys.MultiProduce) { >> - try { >> - val request = MultiProducerRequest.readFrom(buffer) >> - for (produce <- request.produces) { >> - try { >> - for (messageAndOffset <- produce.messages) >> - if (!messageAndOffset.message.isValid) >> - trace("topic " + produce.topic + " is invalid") >> - } >> - catch { >> - case e: Throwable => >> - trace("error iterating messages ", e) >> - } >> - } >> - } >> - catch { >> - case e: Throwable => >> - trace("error verifying sendbuffer ", e) >> - } >> - } >> + val request = ProducerRequest.readFrom(buffer) >> + trace(request.toString) >> } >> } >> - >> /** >> * Common functionality for the public send methods >> */ >> @@ -108,21 +89,15 @@ class SyncProducer(val config: SyncProdu >> /** >> * Send a message >> */ >> - def send(topic: String, partition: Int, messages: ByteBufferMessageSet) >> { >> - verifyMessageSize(messages) >> - val setSize = messages.sizeInBytes.asInstanceOf[Int] >> - trace("Got message set with " + setSize + " bytes to send") >> - send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, >> messages))) >> - } >> - >> - def send(topic: String, messages: ByteBufferMessageSet): Unit = >> send(topic, ProducerRequest.RandomPartition, messages) >> - >> - def multiSend(produces: Array[ProducerRequest]) { >> - for (request <- produces) >> - verifyMessageSize(request.messages) >> - val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes) >> - trace("Got multi message sets with " + setSize + " bytes to send") >> - send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) >> + def send(producerRequest: ProducerRequest) { >> + producerRequest.data.foreach(d => { >> + d.partitionData.foreach(p => { >> + verifyMessageSize(new >> ByteBufferMessageSet(p.messages.getSerialized())) >> + val setSize = p.messages.sizeInBytes.asInstanceOf[Int] >> + trace("Got message set with " + setSize + " bytes to send") >> + }) >> + }) >> + send(new BoundedByteBufferSend(producerRequest)) >> } >> >> def send(request: TopicMetadataRequest): Seq[TopicMetadata] = { >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala >> Sat Mar 3 05:46:43 2012 >> @@ -41,4 +41,23 @@ trait SyncProducerConfigShared { >> val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000) >> >> val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000) >> + >> + /* the client application sending the producer requests */ >> + val correlationId = >> Utils.getInt(props,"producer.request.correlation_id",-1) >> + >> + /* the client application sending the producer requests */ >> + val clientId = Utils.getString(props,"producer.request.client_id","") >> + >> + /* the required_acks of the producer requests */ >> + val requiredAcks = >> Utils.getShort(props,"producer.request.required_acks",0) >> + >> + /* the ack_timeout of the producer requests */ >> + val ackTimeout = Utils.getInt(props,"producer.request.ack_timeout",1) >> } >> + >> +object SyncProducerConfig { >> + val DefaultCorrelationId = -1 >> + val DefaultClientId = "" >> + val DefaultRequiredAcks : Short = 0 >> + val DefaultAckTimeoutMs = 1 >> +} >> \ No newline at end of file >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala >> Sat Mar 3 05:46:43 2012 >> @@ -17,7 +17,7 @@ >> >> package kafka.producer.async >> >> -import kafka.api.ProducerRequest >> +import kafka.api.{ProducerRequest, TopicData, PartitionData} >> import kafka.serializer.Encoder >> import kafka.producer._ >> import kafka.cluster.{Partition, Broker} >> @@ -147,9 +147,22 @@ class DefaultEventHandler[K,V](config: P >> >> private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), >> ByteBufferMessageSet]) { >> if(messagesPerTopic.size > 0) { >> - val requests = messagesPerTopic.map(f => new >> ProducerRequest(f._1._1, f._1._2, f._2)).toArray >> + val topics = new HashMap[String, ListBuffer[PartitionData]]() >> + val requests = messagesPerTopic.map(f => { >> + val topicName = f._1._1 >> + val partitionId = f._1._2 >> + val messagesSet= f._2 >> + val topic = topics.get(topicName) // checking to see if this >> topics exists >> + topic match { >> + case None => topics += topicName -> new >> ListBuffer[PartitionData]() //create a new listbuffer for this topic >> + case Some(x) => trace("found " + topicName) >> + } >> + topics(topicName).append(new PartitionData(partitionId, >> messagesSet)) >> + }) >> + val topicData = topics.map(kv => new TopicData(kv._1,kv._2.toArray)) >> + val producerRequest = new ProducerRequest(config.correlationId, >> config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray) >> //new kafka.javaapi.ProducerRequest(correlation_id, client_id, >> required_acks, ack_timeout, topic_data.toArray) >> val syncProducer = producerPool.getProducer(brokerId) >> - syncProducer.multiSend(requests) >> + syncProducer.send(producerRequest) >> trace("kafka producer sent messages for topics %s to broker %s:%d" >> .format(messagesPerTopic, syncProducer.config.host, >> syncProducer.config.port)) >> } >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala >> Sat Mar 3 05:46:43 2012 >> @@ -41,7 +41,6 @@ class KafkaApis(val logManager: LogManag >> apiId match { >> case RequestKeys.Produce => handleProducerRequest(receive) >> case RequestKeys.Fetch => handleFetchRequest(receive) >> - case RequestKeys.MultiProduce => handleMultiProducerRequest(receive) >> case RequestKeys.Offsets => handleOffsetRequest(receive) >> case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive) >> case _ => throw new IllegalStateException("No mapping found for >> handler id " + apiId) >> @@ -59,31 +58,38 @@ class KafkaApis(val logManager: LogManag >> None >> } >> >> - def handleMultiProducerRequest(receive: Receive): Option[Send] = { >> - val request = MultiProducerRequest.readFrom(receive.buffer) >> - if(requestLogger.isTraceEnabled) >> - requestLogger.trace("Multiproducer request " + request.toString) >> - request.produces.map(handleProducerRequest(_, "MultiProducerRequest")) >> - None >> - } >> - >> - private def handleProducerRequest(request: ProducerRequest, >> requestHandlerName: String) = { >> - val partition = >> request.getTranslatedPartition(logManager.chooseRandomPartition) >> - try { >> - logManager.getOrCreateLog(request.topic, >> partition).append(request.messages) >> - trace(request.messages.sizeInBytes + " bytes written to logs.") >> - } catch { >> - case e => >> - error("Error processing " + requestHandlerName + " on " + >> request.topic + ":" + partition, e) >> - e match { >> - case _: IOException => >> - fatal("Halting due to unrecoverable I/O error while handling >> producer request: " + e.getMessage, e) >> - System.exit(1) >> - case _ => >> + private def handleProducerRequest(request: ProducerRequest, >> requestHandlerName: String): Option[ProducerResponse] = { >> + val requestSize = request.data.size >> + val errors = new Array[Int](requestSize) >> + val offsets = new Array[Long](requestSize) >> + >> + request.data.foreach(d => { >> + d.partitionData.foreach(p => { >> + val partition = p.getTranslatedPartition(d.topic, >> logManager.chooseRandomPartition) >> + try { >> + logManager.getOrCreateLog(d.topic, partition).append(p.messages) >> + trace(p.messages.sizeInBytes + " bytes written to logs.") >> + p.messages.foreach(m => trace("wrote message %s to >> disk".format(m.message.checksum))) >> } >> - throw e >> - } >> - None >> + catch { >> + case e => >> + //TODO: handle response in ProducerResponse >> + error("Error processing " + requestHandlerName + " on " + >> d.topic + ":" + partition, e) >> + e match { >> + case _: IOException => >> + fatal("Halting due to unrecoverable I/O error while >> handling producer request: " + e.getMessage, e) >> + Runtime.getRuntime.halt(1) >> + case _ => >> + } >> + //throw e >> + } >> + }) >> + //None >> + }) >> + if (request.requiredAcks == 0) >> + None >> + else >> + None //TODO: send when KAFKA-49 can receive this Some(new >> ProducerResponse(request.versionId, request.correlationId, errors, offsets)) >> } >> >> def handleFetchRequest(request: Receive): Option[Send] = { >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala >> Sat Mar 3 05:46:43 2012 >> @@ -195,6 +195,9 @@ object Utils extends Logging { >> def getInt(props: Properties, name: String, default: Int): Int = >> getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue)) >> >> + def getShort(props: Properties, name: String, default: Short): Short = >> + getShortInRange(props, name, default, (Short.MinValue, >> Short.MaxValue)) >> + >> /** >> * Read an integer from the properties instance. Throw an exception >> * if the value is not in the given range (inclusive) >> @@ -217,6 +220,18 @@ object Utils extends Logging { >> v >> } >> >> + def getShortInRange(props: Properties, name: String, default: Short, >> range: (Short, Short)): Short = { >> + val v = >> + if(props.containsKey(name)) >> + props.getProperty(name).toShort >> + else >> + default >> + if(v < range._1 || v > range._2) >> + throw new IllegalArgumentException(name + " has value " + v + " >> which is not in the range " + range + ".") >> + else >> + v >> + } >> + >> def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): >> Int = { >> val value = buffer.getInt >> if(value < range._1 || value > range._2) >> @@ -777,4 +792,4 @@ class SnapshotStats(private val monitorD >> >> def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) >> } >> -} >> +} >> \ No newline at end of file >> >> Modified: >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala >> Sat Mar 3 05:46:43 2012 >> @@ -33,7 +33,7 @@ class ByteBufferMessageSetTest extends B >> // create a ByteBufferMessageSet that doesn't contain a full message >> // iterating it should get an InvalidMessageSizeException >> val messages = new ByteBufferMessageSet(NoCompressionCodec, new >> Message("01234567890123456789".getBytes())) >> - val buffer = messages.serialized.slice >> + val buffer = messages.getSerialized().slice >> buffer.limit(10) >> val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = >> buffer, initialOffset = 1000) >> try { >> @@ -51,7 +51,7 @@ class ByteBufferMessageSetTest extends B >> { >> val messages = new ByteBufferMessageSet(NoCompressionCodec, new >> Message("hello".getBytes()), new Message("there".getBytes())) >> val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2) >> - buffer.put(messages.serialized) >> + buffer.put(messages.getSerialized()) >> buffer.putShort(4) >> val messagesPlus = new ByteBufferMessageSet(buffer) >> assertEquals("Adding invalid bytes shouldn't change byte count", >> messages.validBytes, messagesPlus.validBytes) >> @@ -93,7 +93,7 @@ class ByteBufferMessageSetTest extends B >> //make sure ByteBufferMessageSet is re-iterable. >> TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(messageSet.iterator)) >> //make sure the last offset after iteration is correct >> - assertEquals("offset of last message not expected", >> messageSet.last.offset, messageSet.serialized.limit) >> + assertEquals("offset of last message not expected", >> messageSet.last.offset, messageSet.getSerialized().limit) >> } >> >> // test for compressed regular messages >> @@ -103,7 +103,7 @@ class ByteBufferMessageSetTest extends B >> //make sure ByteBufferMessageSet is re-iterable. >> TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(messageSet.iterator)) >> //make sure the last offset after iteration is correct >> - assertEquals("offset of last message not expected", >> messageSet.last.offset, messageSet.serialized.limit) >> + assertEquals("offset of last message not expected", >> messageSet.last.offset, messageSet.getSerialized().limit) >> } >> >> // test for mixed empty and non-empty messagesets uncompressed >> @@ -111,16 +111,16 @@ class ByteBufferMessageSetTest extends B >> val emptyMessageList : List[Message] = Nil >> val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, >> emptyMessageList: _*) >> val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, >> messageList: _*) >> - val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + >> regularMessgeSet.serialized.limit) >> - buffer.put(emptyMessageSet.serialized) >> - buffer.put(regularMessgeSet.serialized) >> + val buffer = >> ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + >> regularMessgeSet.getSerialized().limit) >> + buffer.put(emptyMessageSet.getSerialized()) >> + buffer.put(regularMessgeSet.getSerialized()) >> buffer.rewind >> val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) >> TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) >> //make sure ByteBufferMessageSet is re-iterable. >> TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) >> //make sure the last offset after iteration is correct >> - assertEquals("offset of last message not expected", >> mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) >> + assertEquals("offset of last message not expected", >> mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) >> } >> >> // test for mixed empty and non-empty messagesets compressed >> @@ -128,16 +128,16 @@ class ByteBufferMessageSetTest extends B >> val emptyMessageList : List[Message] = Nil >> val emptyMessageSet = new >> ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*) >> val regularMessgeSet = new >> ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*) >> - val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + >> regularMessgeSet.serialized.limit) >> - buffer.put(emptyMessageSet.serialized) >> - buffer.put(regularMessgeSet.serialized) >> + val buffer = >> ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + >> regularMessgeSet.getSerialized().limit) >> + buffer.put(emptyMessageSet.getSerialized()) >> + buffer.put(regularMessgeSet.getSerialized()) >> buffer.rewind >> val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) >> TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) >> //make sure ByteBufferMessageSet is re-iterable. >> TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) >> //make sure the last offset after iteration is correct >> - assertEquals("offset of last message not expected", >> mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) >> + assertEquals("offset of last message not expected", >> mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) >> } >> } >> >> >> Modified: >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala >> Sat Mar 3 05:46:43 2012 >> @@ -381,11 +381,12 @@ class AsyncProducerTest extends JUnit3Su >> val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) >> mockSyncProducer.send(new TopicMetadataRequest(List(topic))) >> EasyMock.expectLastCall().andReturn(List(topic1Metadata)) >> - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new >> ProducerRequest(topic, 0, messagesToSet(msgs.take(5)))))) >> + mockSyncProducer.send(TestUtils.produceRequest(topic, 0, >> + messagesToSet(msgs.take(5)))) >> EasyMock.expectLastCall >> - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new >> ProducerRequest(topic, 0, messagesToSet(msgs.takeRight(5)))))) >> - EasyMock.expectLastCall >> - EasyMock.replay(mockSyncProducer) >> + mockSyncProducer.send(TestUtils.produceRequest(topic, 0, >> + messagesToSet(msgs.takeRight(5)))) >> + EasyMock.replay(mockSyncProducer) >> >> val producerPool = EasyMock.createMock(classOf[ProducerPool]) >> producerPool.getZkClient >> @@ -495,10 +496,7 @@ class AsyncProducerTest extends JUnit3Su >> } >> >> class MockProducer(override val config: SyncProducerConfig) extends >> SyncProducer(config) { >> - override def send(topic: String, messages: ByteBufferMessageSet): >> Unit = { >> - Thread.sleep(1000) >> - } >> - override def multiSend(produces: Array[ProducerRequest]) { >> + override def send(produceRequest: ProducerRequest): Unit = { >> Thread.sleep(1000) >> } >> } >> >> Modified: >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala >> Sat Mar 3 05:46:43 2012 >> @@ -44,7 +44,7 @@ class SyncProducerTest extends JUnit3Sui >> var failed = false >> val firstStart = SystemTime.milliseconds >> try { >> - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec >> = NoCompressionCodec, messages = new Message(messageBytes))) >> + producer.send(TestUtils.produceRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new >> Message(messageBytes)))) >> }catch { >> case e: Exception => failed=true >> } >> @@ -54,7 +54,7 @@ class SyncProducerTest extends JUnit3Sui >> Assert.assertTrue((firstEnd-firstStart) < 500) >> val secondStart = SystemTime.milliseconds >> try { >> - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec >> = NoCompressionCodec, messages = new Message(messageBytes))) >> + producer.send(TestUtils.produceRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new >> Message(messageBytes)))) >> }catch { >> case e: Exception => failed = true >> } >> @@ -63,7 +63,7 @@ class SyncProducerTest extends JUnit3Sui >> Assert.assertTrue((secondEnd-secondStart) < 500) >> >> try { >> - producer.multiSend(Array(new ProducerRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new >> Message(messageBytes))))) >> + producer.send(TestUtils.produceRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new >> Message(messageBytes)))) >> }catch { >> case e: Exception => failed=true >> } >> @@ -83,7 +83,7 @@ class SyncProducerTest extends JUnit3Sui >> val bytes = new Array[Byte](101) >> var failed = false >> try { >> - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec >> = NoCompressionCodec, messages = new Message(bytes))) >> + producer.send(TestUtils.produceRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new >> Message(bytes)))) >> }catch { >> case e: MessageSizeTooLargeException => failed = true >> } >> >> Modified: >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff >> >> ============================================================================== >> --- >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala >> Sat Mar 3 05:46:43 2012 >> @@ -33,6 +33,7 @@ import collection.mutable.ListBuffer >> import kafka.consumer.{KafkaMessageStream, ConsumerConfig} >> import scala.collection.Map >> import kafka.serializer.Encoder >> +import kafka.api.{ProducerRequest, TopicData, PartitionData} >> >> /** >> * Utility functions to help with testing >> @@ -336,7 +337,47 @@ object TestUtils { >> buffer += ("msg" + i) >> buffer >> } >> + /** >> + * Create a wired format request based on simple basic information >> + */ >> + def produceRequest(topic: String, message: ByteBufferMessageSet): >> kafka.api.ProducerRequest = { >> + >> produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message) >> + } >> + def produceRequest(topic: String, partition: Int, message: >> ByteBufferMessageSet): kafka.api.ProducerRequest = { >> + >> produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message) >> + } >> + >> + def produceRequest(correlationId: Int, topic: String, partition: Int, >> message: ByteBufferMessageSet): kafka.api.ProducerRequest = { >> + val clientId = SyncProducerConfig.DefaultClientId >> + val requiredAcks: Short = SyncProducerConfig.DefaultRequiredAcks >> + val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs >> + var data = new Array[TopicData](1) >> + var partitionData = new Array[PartitionData](1) >> + partitionData(0) = new PartitionData(partition,message) >> + data(0) = new TopicData(topic,partitionData) >> + val pr = new kafka.api.ProducerRequest(correlationId, clientId, >> requiredAcks, ackTimeout, data) >> + pr >> + } >> >> + def produceJavaRequest(topic: String, message: >> kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest >> = { >> + produceJavaRequest(-1,topic,-1,message) >> + } >> + >> + def produceJavaRequest(topic: String, partition: Int, message: >> kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest >> = { >> + produceJavaRequest(-1,topic,partition,message) >> + } >> + >> + def produceJavaRequest(correlationId: Int, topic: String, partition: >> Int, message: kafka.javaapi.message.ByteBufferMessageSet): >> kafka.javaapi.ProducerRequest = { >> + val clientId = "test" >> + val requiredAcks: Short = 0 >> + val ackTimeout = 0 >> + var data = new Array[TopicData](1) >> + var partitionData = new Array[PartitionData](1) >> + partitionData(0) = new PartitionData(partition,message.underlying) >> + data(0) = new TopicData(topic,partitionData) >> + val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, >> requiredAcks, ackTimeout, data) >> + pr >> + } >> } >> >> object TestZKUtils { >> >> >>