Repository: kafka
Updated Branches:
  refs/heads/0.9.0 f22ea2970 -> e176fcc7f


http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/README.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/README.md 
b/contrib/hadoop-producer/README.md
deleted file mode 100644
index a5bef73..0000000
--- a/contrib/hadoop-producer/README.md
+++ /dev/null
@@ -1,94 +0,0 @@
-Hadoop to Kafka Bridge
-======================
-
-What's new?
------------
-
-* Kafka 0.8 support
-  * No more ZK-based load balancing (backwards incompatible change)
-* Semantic partitioning is now supported in KafkaOutputFormat. Just specify a
-  key in the output committer of your job. The Pig StoreFunc doesn't support
-  semantic partitioning.
-* Config parameters are now the same as the Kafka producer, just prepended with
-  kafka.output (e.g., kafka.output.max.message.size). This is a backwards
-  incompatible change.
-
-What is it?
------------
-
-The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There
-are two possible mechanisms, varying from easy to difficult: writing a Pig
-script and writing messages in Avro format, or rolling your own job using the
-Kafka `OutputFormat`. 
-
-Note that there are no write-once semantics: any client of the data must handle
-messages in an idempotent manner. That is, because of node failures and
-Hadoop's failure recovery, it's possible that the same message is published
-multiple times in the same push.
-
-How do I use it?
-----------------
-
-With this bridge, Kafka topics are URIs and are specified as URIs of the form
-`kafka://<kafka-server>/<kafka-topic>` to connect to a specific Kafka broker.
-
-### Pig ###
-
-Pig bridge writes data in binary Avro format with one message created per input
-row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage`
-with the Avro schema as its first argument. You'll need to register the
-appropriate Kafka JARs. Here is what an example Pig script looks like:
-
-    REGISTER hadoop-producer_2.8.0-0.8.0.jar;
-    REGISTER avro-1.4.0.jar;
-    REGISTER piggybank.jar;
-    REGISTER kafka-0.8.0.jar;
-    REGISTER jackson-core-asl-1.5.5.jar;
-    REGISTER jackson-mapper-asl-1.5.5.jar;
-    REGISTER scala-library.jar;
-
-    member_info = LOAD 'member_info.tsv' AS (member_id : int, name : 
chararray);
-    names = FOREACH member_info GENERATE name;
-    STORE member_info INTO 'kafka://my-kafka:9092/member_info' USING 
kafka.bridge.AvroKafkaStorage('"string"');
-
-That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
-from Pig's data model to the specified Avro schema.
-
-Further, multi-store is possible with KafkaStorage, so you can easily write to
-multiple topics and brokers in the same job:
-
-    SPLIT member_info INTO early_adopters IF member_id < 1000, others IF 
member_id >= 1000;
-    STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING 
AvroKafkaStorage('$schema');
-    STORE others INTO 'kafka://my-broker2:9092/others' USING 
AvroKafkaStorage('$schema');
-
-### KafkaOutputFormat ###
-
-KafkaOutputFormat is a Hadoop OutputFormat for publishing data via Kafka. It
-uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e.,
-BytesWritable). This is a lower-level method of publishing data, as it allows
-you to precisely control output.
-
-Included is an example that publishes some input text line-by-line to a topic.
-With KafkaOutputFormat, the key can be a null, where it is ignored by the
-producer (random partitioning), or any object for semantic partitioning of the
-stream (with an appropriate Kafka partitioner set). Speculative execution is
-turned off by the OutputFormat.
-
-What can I tune?
-----------------
-
-* kafka.output.queue.bytes: Bytes to queue in memory before pushing to the 
Kafka
-  producer (i.e., the batch size). Default is 1,000,000 (1 million) bytes.
-
-Any of Kafka's producer parameters can be changed by prefixing them with
-"kafka.output" in one's job configuration. For example, to change the
-compression codec, one would add the "kafka.output.compression.codec" parameter
-(e.g., "SET kafka.output.compression.codec 0" in one's Pig script for no
-compression). 
-
-For easier debugging, the above values as well as the Kafka broker information
-(kafka.metadata.broker.list), the topic (kafka.output.topic), and the schema
-(kafka.output.schema) are injected into the job's configuration. By default,
-the Hadoop producer uses Kafka's sync producer as asynchronous operation
-doesn't make sense in the batch Hadoop case.
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
 
b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
deleted file mode 100644
index d447b1d..0000000
--- 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.bridge.examples;
-
-import java.io.IOException;
-import kafka.bridge.hadoop.KafkaOutputFormat;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
-/**
- * Publish a text file line by line to a Kafka topic
- */
-public class TextPublisher
-{
-  public static void main(String[] args) throws Exception
-  {
-    if (args.length != 2) {
-      System.err.println("usage: <input path> <kafka output url>");
-      return;
-    }
-
-    Job job = new Job();
-
-    job.setJarByClass(TextPublisher.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setOutputFormatClass(KafkaOutputFormat.class);
-
-    job.setMapperClass(TheMapper.class);
-    job.setNumReduceTasks(0);
-
-    FileInputFormat.addInputPath(job, new Path(args[0]));
-    KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
-
-    if (!job.waitForCompletion(true)) {
-      throw new RuntimeException("Job failed!");
-    }
-  }
-
-  public static class TheMapper extends Mapper<Object, Text, Object, Object>
-  {
-    @Override
-    protected void map(Object key, Text value, Context context) throws 
IOException, InterruptedException
-    {
-      context.write(null, value.getBytes());
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
 
b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
deleted file mode 100644
index 417b4b3..0000000
--- 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.bridge.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.*;
-
-import kafka.common.KafkaException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.ProducerConfig;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.log4j.Logger;
-
-public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
-{
-  private Logger log = Logger.getLogger(KafkaOutputFormat.class);
-
-  public static final String KAFKA_URL = "kafka.output.url";
-  /** Bytes to buffer before the OutputFormat does a send (i.e., the 
amortization window):
-   *  We set the default to a million bytes so that the server will not reject 
the batch of messages
-   *  with a MessageSizeTooLargeException. The actual size will be smaller 
after compression.
-   */
-  public static final int KAFKA_QUEUE_BYTES = 1000000;
-
-  public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
-  private static final Map<String, String> kafkaConfigMap;
-  static {
-    Map<String, String> cMap = new HashMap<String, String>();
-
-    // default Hadoop producer configs
-    cMap.put("producer.type", "sync");
-    cMap.put("compression.codec", Integer.toString(1));
-    cMap.put("request.required.acks", Integer.toString(1));
-
-    kafkaConfigMap = Collections.unmodifiableMap(cMap);
-  }
-
-  public KafkaOutputFormat()
-  {
-    super();
-  }
-
-  public static void setOutputPath(Job job, Path outputUrl)
-  {
-    job.getConfiguration().set(KafkaOutputFormat.KAFKA_URL, 
outputUrl.toString());
-
-    
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", 
false);
-    
job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", 
false);
-  }
-
-  public static Path getOutputPath(JobContext job)
-  {
-    String name = job.getConfiguration().get(KafkaOutputFormat.KAFKA_URL);
-    return name == null ? null : new Path(name);
-  }
-
-  @Override
-  public void checkOutputSpecs(JobContext jobContext) throws IOException, 
InterruptedException
-  {
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException
-  {
-    // Is there a programmatic way to get the temp dir? I see it hardcoded 
everywhere in Hadoop, Hive, and Pig.
-    return new FileOutputCommitter(new Path("/tmp/" + 
taskAttemptContext.getTaskAttemptID().getJobID().toString()), 
taskAttemptContext);
-  }
-
-  @Override
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws 
IOException, InterruptedException
-  {
-    Path outputPath = getOutputPath(context);
-    if (outputPath == null)
-      throw new KafkaException("no kafka output url specified");
-    URI uri = URI.create(outputPath.toString());
-    Configuration job = context.getConfiguration();
-
-    Properties props = new Properties();
-    String topic;
-
-    props.putAll(kafkaConfigMap);                       // inject default 
configuration
-    for (Map.Entry<String, String> m : job) {           // handle any overrides
-      if (!m.getKey().startsWith(KAFKA_CONFIG_PREFIX))
-        continue;
-      if (m.getKey().equals(KAFKA_URL))
-        continue;
-
-      String kafkaKeyName = 
m.getKey().substring(KAFKA_CONFIG_PREFIX.length()+1);
-      props.setProperty(kafkaKeyName, m.getValue());    // set Kafka producer 
property
-    }
-
-    // inject Kafka producer props back into jobconf for easier debugging
-    for (Map.Entry<Object, Object> m : props.entrySet()) {
-      job.set(KAFKA_CONFIG_PREFIX + "." + m.getKey().toString(), 
m.getValue().toString());
-    }
-
-    // KafkaOutputFormat specific parameters
-    final int queueBytes = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.bytes", 
KAFKA_QUEUE_BYTES);
-
-    if (uri.getScheme().equals("kafka")) {
-      // using the direct broker list
-      // URL: kafka://<kafka host>/<topic>
-      // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
-      String brokerList = uri.getAuthority();
-      props.setProperty("metadata.broker.list", brokerList);
-      job.set(KAFKA_CONFIG_PREFIX + ".metadata.broker.list", brokerList);
-
-      if (uri.getPath() == null || uri.getPath().length() <= 1)
-        throw new KafkaException("no topic specified in kafka uri");
-
-      topic = uri.getPath().substring(1);               // ignore the initial 
'/' in the path
-      job.set(KAFKA_CONFIG_PREFIX + ".topic", topic);
-      log.info(String.format("using kafka broker %s (topic %s)", brokerList, 
topic));
-    } else
-      throw new KafkaException("missing scheme from kafka uri (must be 
kafka://)");
-
-    Producer<Object, byte[]> producer = new Producer<Object, byte[]>(new 
ProducerConfig(props));
-    return new KafkaRecordWriter<K, V>(producer, topic, queueBytes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
 
b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
deleted file mode 100644
index 72c088d..0000000
--- 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.bridge.hadoop;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
-{
-  protected Producer<Object, byte[]> producer;
-  protected String topic;
-
-  protected List<KeyedMessage<Object, byte[]>> msgList = new 
LinkedList<KeyedMessage<Object, byte[]>>();
-  protected int totalBytes = 0;
-  protected int queueBytes;
-
-  public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, 
int queueBytes)
-  {
-    this.producer = producer;
-    this.topic = topic;
-    this.queueBytes = queueBytes;
-  }
-
-  protected void sendMsgList() throws IOException
-  {
-    if (msgList.size() > 0) {
-      try {
-        producer.send(msgList);
-      }
-      catch (Exception e) {
-        throw new IOException(e);           // all Kafka exceptions become 
IOExceptions
-      }
-      msgList.clear();
-      totalBytes = 0;
-    }
-  }
-
-  @Override
-  public void write(K key, V value) throws IOException, InterruptedException
-  {
-    byte[] valBytes;
-    if (value instanceof byte[])
-      valBytes = (byte[]) value;
-    else if (value instanceof BytesWritable)
-      // BytesWritable.getBytes returns its internal buffer, so .length would 
refer to its capacity, not the
-      // intended size of the byte array contained.  We need to use 
BytesWritable.getLength for the true size.
-      valBytes = Arrays.copyOf(((BytesWritable) value).getBytes(), 
((BytesWritable) value).getLength());
-    else
-      throw new IllegalArgumentException("KafkaRecordWriter expects byte array 
value to publish");
-
-    // MultiProducerRequest only supports sending up to Short.MAX_VALUE 
messages in one batch
-    // If the new message is going to make the message list tip over 1 million 
bytes, send the
-    // message list now.
-    if ((totalBytes + valBytes.length) > queueBytes || msgList.size() >= 
Short.MAX_VALUE)
-      sendMsgList();
-
-    msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes));
-    totalBytes += valBytes.length;
-  }
-
-  @Override
-  public void close(TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException
-  {
-    sendMsgList();
-    producer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java 
b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
deleted file mode 100644
index d24a85a..0000000
--- 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.bridge.pig;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import kafka.bridge.hadoop.KafkaOutputFormat;
-import kafka.bridge.hadoop.KafkaRecordWriter;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.Encoder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
-import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
-
-public class AvroKafkaStorage extends StoreFunc
-{
-  protected KafkaRecordWriter<Object, byte[]> writer;
-  protected org.apache.avro.Schema avroSchema;
-  protected PigAvroDatumWriter datumWriter;
-  protected Encoder encoder;
-  protected ByteArrayOutputStream os;
-
-  public AvroKafkaStorage(String schema)
-  {
-    this.avroSchema = org.apache.avro.Schema.parse(schema);
-  }
-
-  @Override
-  public OutputFormat getOutputFormat() throws IOException
-  {
-    return new KafkaOutputFormat();
-  }
-
-  @Override
-  public String relToAbsPathForStoreLocation(String location, Path curDir) 
throws IOException
-  {
-    return location;
-  }
-
-  @Override
-  public void setStoreLocation(String uri, Job job) throws IOException
-  {
-    KafkaOutputFormat.setOutputPath(job, new Path(uri));
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void prepareToWrite(RecordWriter writer) throws IOException
-  {
-    if (this.avroSchema == null)
-      throw new IllegalStateException("avroSchema shouldn't be null");
-
-    this.writer = (KafkaRecordWriter) writer;
-    this.datumWriter = new PigAvroDatumWriter(this.avroSchema);
-    this.os = new ByteArrayOutputStream();
-    this.encoder = new BinaryEncoder(this.os);
-  }
-
-  @Override
-  public void cleanupOnFailure(String location, Job job) throws IOException
-  {
-  }
-
-  @Override
-  public void setStoreFuncUDFContextSignature(String signature)
-  {
-  }
-
-  @Override
-  public void checkSchema(ResourceSchema schema) throws IOException
-  {
-    this.avroSchema = PigSchema2Avro.validateAndConvert(avroSchema, schema);
-  }
-
-  protected void writeEnvelope(OutputStream os, Encoder enc) throws IOException
-  {
-  }
-
-  @Override
-  public void putNext(Tuple tuple) throws IOException
-  {
-    os.reset();
-    writeEnvelope(os, this.encoder);
-    datumWriter.write(tuple, this.encoder);
-    this.encoder.flush();
-
-    try {
-      this.writer.write(null, this.os.toByteArray());
-    }
-    catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/docs/api.html
----------------------------------------------------------------------
diff --git a/docs/api.html b/docs/api.html
index 1787f06..835bdf2 100644
--- a/docs/api.html
+++ b/docs/api.html
@@ -5,9 +5,9 @@
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at
- 
+
     http://www.apache.org/licenses/LICENSE-2.0
- 
+
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,7 +28,7 @@ As of the 0.8.2 release we encourage all new development to 
use the new Java pro
        &lt;/dependency&gt;
 </pre>
 
-Examples showing how to use the producer are given in the 
+Examples showing how to use the producer are given in the
 <a 
href="http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html";
 title="Kafka 0.8.2 Javadoc">javadocs</a>.
 
 <p>
@@ -144,17 +144,7 @@ class kafka.javaapi.consumer.SimpleConsumer {
 For most applications, the high level consumer Api is good enough. Some 
applications want features not exposed to the high level consumer yet (e.g., 
set initial offset when restarting the consumer). They can instead use our low 
level SimpleConsumer Api. The logic will be a bit more complicated and you can 
follow the example in
 <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";
 title="Kafka 0.8 SimpleConsumer example">here</a>.
 
-<h3><a id="kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a></h3>
-<p>
-Providing a horizontally scalable solution for aggregating and loading data 
into Hadoop was one of our basic use cases. To support this use case, we 
provide a Hadoop-based consumer which spawns off many map tasks to pull data 
from the Kafka cluster in parallel. This provides extremely fast pull-based 
Hadoop data load capabilities (we were able to fully saturate the network with 
only a handful of Kafka servers).
-</p>
-
-<p>
-Usage information on the hadoop consumer can be found <a 
href="https://github.com/linkedin/camus/";>here</a>.
-</p>
-
-
-<h3><a id="newconsumerapi">2.5 New Consumer API</a></h3>
+<h3><a id="newconsumerapi">2.4 New Consumer API</a></h3>
 As of the 0.9.0 release we have added a replacement for our existing simple 
and high-level consumers. This client is considered beta quality. You can use 
this client by adding a dependency on the client jar using the following 
example maven co-ordinates (you can change the version numbers with new 
releases):
 <pre>
        &lt;dependency&gt;
@@ -165,4 +155,4 @@ As of the 0.9.0 release we have added a replacement for our 
existing simple and
 </pre>
 
 Examples showing how to use the producer are given in the
-<a 
href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaConsumer.html";
 title="Kafka 0.9.0 Javadoc">javadocs</a>.
\ No newline at end of file
+<a 
href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaConsumer.html";
 title="Kafka 0.9.0 Javadoc">javadocs</a>.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/docs/documentation.html
----------------------------------------------------------------------
diff --git a/docs/documentation.html b/docs/documentation.html
index 795bd1e..a123721 100644
--- a/docs/documentation.html
+++ b/docs/documentation.html
@@ -35,7 +35,7 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>, 
<a href="/08/documen
               <li><a href="#producerapi">2.1 Producer API</a>
               <li><a href="#highlevelconsumerapi">2.2 High Level Consumer 
API</a>
               <li><a href="#simpleconsumerapi">2.3 Simple Consumer API</a>
-              <li><a href="#kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer 
API</a>
+              <li><a href="#newconsumerapi">2.4 New Consumer API</a>
           </ul>
     <li><a href="#configuration">3. Configuration</a>
         <ul>

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/docs/implementation.html
----------------------------------------------------------------------
diff --git a/docs/implementation.html b/docs/implementation.html
index d9ffa46..b95d36f 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -5,9 +5,9 @@
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at
- 
+
     http://www.apache.org/licenses/LICENSE-2.0
- 
+
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +20,10 @@
 <h4>Producer APIs</h4>
 
 <p>
-The Producer API that wraps the 2 low-level producers - 
<code>kafka.producer.SyncProducer</code> and 
<code>kafka.producer.async.AsyncProducer</code>. 
+The Producer API that wraps the 2 low-level producers - 
<code>kafka.producer.SyncProducer</code> and 
<code>kafka.producer.async.AsyncProducer</code>.
 <pre>
 class Producer<T> {
-       
+
   /* Sends the data, partitioned by key to the topic using either the */
   /* synchronous or the asynchronous producer */
   public void send(kafka.javaapi.producer.ProducerData&lt;K,V&gt; 
producerData);
@@ -32,21 +32,21 @@ class Producer<T> {
   /* the synchronous or the asynchronous producer */
   public void 
send(java.util.List&lt;kafka.javaapi.producer.ProducerData&lt;K,V&gt;&gt; 
producerData);
 
-  /* Closes the producer and cleans up */      
+  /* Closes the producer and cleans up */
   public void close();
 
 }
 </pre>
 
-The goal is to expose all the producer functionality through a single API to 
the client.  
+The goal is to expose all the producer functionality through a single API to 
the client.
 
 The new producer -
 <ul>
-<li>can handle queueing/buffering of multiple producer requests and 
asynchronous dispatch of the batched data -        
+<li>can handle queueing/buffering of multiple producer requests and 
asynchronous dispatch of the batched data -
 <p><code>kafka.producer.Producer</code> provides the ability to batch multiple 
produce requests (<code>producer.type=async</code>), before serializing and 
dispatching them to the appropriate kafka broker partition. The size of the 
batch can be controlled by a few config parameters. As events enter a queue, 
they are buffered in a queue, until either <code>queue.time</code> or 
<code>batch.size</code> is reached. A background thread 
(<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of 
data and lets the <code>kafka.producer.EventHandler</code> serialize and send 
the data to the appropriate kafka broker partition. A custom event handler can 
be plugged in through the <code>event.handler</code> config parameter. At 
various stages of this producer queue pipeline, it is helpful to be able to 
inject callbacks, either for plugging in custom logging/tracing code or custom 
monitoring logic. This is possible by implementing the 
<code>kafka.producer.async.CallbackHandler</c
 ode> interface and setting <code>callback.handler</code> config parameter to 
that class.
 </p>
 </li>
-<li>handles the serialization of data through a user-specified 
<code>Encoder</code> - 
+<li>handles the serialization of data through a user-specified 
<code>Encoder</code> -
 <pre>
 interface Encoder&lt;T&gt; {
   public Message toMessage(T data);
@@ -54,15 +54,15 @@ interface Encoder&lt;T&gt; {
 </pre>
 <p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p>
 </li>
-<li>provides software load balancing through an optionally user-specified 
<code>Partitioner</code> - 
+<li>provides software load balancing through an optionally user-specified 
<code>Partitioner</code> -
 <p>
-The routing decision is influenced by the 
<code>kafka.producer.Partitioner</code>. 
+The routing decision is influenced by the 
<code>kafka.producer.Partitioner</code>.
 <pre>
 interface Partitioner&lt;T&gt; {
    int partition(T key, int numPartitions);
 }
 </pre>
-The partition API uses the key and the number of available broker partitions 
to return a partition id. This id is used as an index into a sorted list of 
broker_ids and partitions to pick a broker partition for the producer request. 
The default partitioning strategy is <code>hash(key)%numPartitions</code>. If 
the key is null, then a random broker partition is picked. A custom 
partitioning strategy can also be plugged in using the 
<code>partitioner.class</code> config parameter.      
+The partition API uses the key and the number of available broker partitions 
to return a partition id. This id is used as an index into a sorted list of 
broker_ids and partitions to pick a broker partition for the producer request. 
The default partitioning strategy is <code>hash(key)%numPartitions</code>. If 
the key is null, then a random broker partition is picked. A custom 
partitioning strategy can also be plugged in using the 
<code>partitioner.class</code> config parameter.
 </p>
 </li>
 </ul>
@@ -79,11 +79,11 @@ The high-level API hides the details of brokers from the 
consumer and allows con
 <h5>Low-level API</h5>
 <pre>
 class SimpleConsumer {
-       
-  /* Send fetch request to a broker and get back a set of messages. */ 
+
+  /* Send fetch request to a broker and get back a set of messages. */
   public ByteBufferMessageSet fetch(FetchRequest request);
 
-  /* Send a list of fetch requests to a broker and get back a response set. */ 
+  /* Send a list of fetch requests to a broker and get back a response set. */
   public MultiFetchResponse multifetch(List&lt;FetchRequest&gt; fetches);
 
   /**
@@ -97,16 +97,16 @@ class SimpleConsumer {
 }
 </pre>
 
-The low-level API is used to implement the high-level API as well as being 
used directly for some of our offline consumers (such as the hadoop consumer) 
which have particular requirements around maintaining state.
+The low-level API is used to implement the high-level API as well as being 
used directly for some of our offline consumers which have particular 
requirements around maintaining state.
 
 <h5>High-level API</h5>
 <pre>
 
-/* create a connection to the cluster */ 
+/* create a connection to the cluster */
 ConsumerConnector connector = Consumer.create(consumerConfig);
 
 interface ConsumerConnector {
-       
+
   /**
    * This method is used to get a list of KafkaStreams, which are iterators 
over
    * MessageAndMetadata objects from which you can obtain messages and their
@@ -114,7 +114,7 @@ interface ConsumerConnector {
    *  Input: a map of &lt;topic, #streams&gt;
    *  Output: a map of &lt;topic, list of message streams&gt;
    */
-  public Map&lt;String,List&lt;KafkaStream&gt;&gt; 
createMessageStreams(Map&lt;String,Int&gt; topicCountMap); 
+  public Map&lt;String,List&lt;KafkaStream&gt;&gt; 
createMessageStreams(Map&lt;String,Int&gt; topicCountMap);
 
   /**
    * You can also obtain a list of KafkaStreams, that iterate over messages
@@ -126,7 +126,7 @@ interface ConsumerConnector {
 
   /* Commit the offsets of all messages consumed so far. */
   public commitOffsets()
-  
+
   /* Shut down the connector */
   public shutdown()
 }
@@ -149,27 +149,27 @@ Messages consist of a fixed-size header and variable 
length opaque byte array pa
 <h3><a id="messageformat">5.4 Message Format</a></h3>
 
 <pre>
-       /** 
-        * A message. The format of an N byte message is the following: 
-        * 
-        * If magic byte is 0 
-        * 
-        * 1. 1 byte "magic" identifier to allow format changes 
-        * 
-        * 2. 4 byte CRC32 of the payload 
-        * 
-        * 3. N - 5 byte payload 
-        * 
-        * If magic byte is 1 
-        * 
-        * 1. 1 byte "magic" identifier to allow format changes 
-        * 
-        * 2. 1 byte "attributes" identifier to allow annotations on the 
message independent of the version (e.g. compression enabled, type of codec 
used) 
-        * 
-        * 3. 4 byte CRC32 of the payload 
-        * 
-        * 4. N - 6 byte payload 
-        * 
+       /**
+        * A message. The format of an N byte message is the following:
+        *
+        * If magic byte is 0
+        *
+        * 1. 1 byte "magic" identifier to allow format changes
+        *
+        * 2. 4 byte CRC32 of the payload
+        *
+        * 3. N - 5 byte payload
+        *
+        * If magic byte is 1
+        *
+        * 1. 1 byte "magic" identifier to allow format changes
+        *
+        * 2. 1 byte "attributes" identifier to allow annotations on the 
message independent of the version (e.g. compression enabled, type of codec 
used)
+        *
+        * 3. 4 byte CRC32 of the payload
+        *
+        * 4. N - 6 byte payload
+        *
         */
 </pre>
 </p>
@@ -183,7 +183,7 @@ The exact binary format for messages is versioned and 
maintained as a standard i
 <pre>
 On-disk format of a message
 
-message length : 4 bytes (value: 1+4+n) 
+message length : 4 bytes (value: 1+4+n)
 "magic" value  : 1 byte
 crc            : 4 bytes
 payload        : n bytes
@@ -289,7 +289,7 @@ When an element in a path is denoted [xyz], that means that 
the value of xyz is
 This is a list of all present broker nodes, each of which provides a unique 
logical broker id which identifies it to consumers (which must be given as part 
of its configuration). On startup, a broker node registers itself by creating a 
znode with the logical broker id under /brokers/ids. The purpose of the logical 
broker id is to allow a broker to be moved to a different physical machine 
without affecting consumers. An attempt to register a broker id that is already 
in use (say because two servers are configured with the same broker id) is an 
error.
 </p>
 <p>
-Since the broker registers itself in ZooKeeper using ephemeral znodes, this 
registration is dynamic and will disappear if the broker is shutdown or dies 
(thus notifying consumers it is no longer available). 
+Since the broker registers itself in ZooKeeper using ephemeral znodes, this 
registration is dynamic and will disappear if the broker is shutdown or dies 
(thus notifying consumers it is no longer available).
 </p>
 <h4>Broker Topic Registry</h4>
 <pre>
@@ -306,7 +306,7 @@ Consumers of topics also register themselves in ZooKeeper, 
in order to coordinat
 </p>
 
 <p>
-Multiple consumers can form a group and jointly consume a single topic. Each 
consumer in the same group is given a shared group_id. 
+Multiple consumers can form a group and jointly consume a single topic. Each 
consumer in the same group is given a shared group_id.
 For example if one consumer is your foobar process, which is run across three 
machines, then you might assign this group of consumers the id "foobar". This 
group id is provided in the configuration of the consumer, and is your way to 
tell the consumer which group it belongs to.
 </p>
 
@@ -371,7 +371,7 @@ The consumer rebalancing algorithms allows all the 
consumers in a group to come
 Each consumer does the following during rebalancing:
 </p>
 <pre>
-   1. For each topic T that C<sub>i</sub> subscribes to 
+   1. For each topic T that C<sub>i</sub> subscribes to
    2.   let P<sub>T</sub> be all partitions producing topic T
    3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> 
that consume topic T
    4.   sort P<sub>T</sub> (so partitions on the same broker are clustered 
together)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 0875611..3d69fac 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,5 +14,5 @@
 // limitations under the License.
 
 apply from: file('scala.gradle')
-include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 
'examples', 'clients', 'tools', 'streams', 'log4j-appender',
-        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
\ No newline at end of file
+include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
+        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'

Reply via email to