Repository: kafka Updated Branches: refs/heads/0.9.0 f22ea2970 -> e176fcc7f
http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/README.md ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md deleted file mode 100644 index a5bef73..0000000 --- a/contrib/hadoop-producer/README.md +++ /dev/null @@ -1,94 +0,0 @@ -Hadoop to Kafka Bridge -====================== - -What's new? ------------ - -* Kafka 0.8 support - * No more ZK-based load balancing (backwards incompatible change) -* Semantic partitioning is now supported in KafkaOutputFormat. Just specify a - key in the output committer of your job. The Pig StoreFunc doesn't support - semantic partitioning. -* Config parameters are now the same as the Kafka producer, just prepended with - kafka.output (e.g., kafka.output.max.message.size). This is a backwards - incompatible change. - -What is it? ------------ - -The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There -are two possible mechanisms, varying from easy to difficult: writing a Pig -script and writing messages in Avro format, or rolling your own job using the -Kafka `OutputFormat`. - -Note that there are no write-once semantics: any client of the data must handle -messages in an idempotent manner. That is, because of node failures and -Hadoop's failure recovery, it's possible that the same message is published -multiple times in the same push. - -How do I use it? ----------------- - -With this bridge, Kafka topics are URIs and are specified as URIs of the form -`kafka://<kafka-server>/<kafka-topic>` to connect to a specific Kafka broker. - -### Pig ### - -Pig bridge writes data in binary Avro format with one message created per input -row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage` -with the Avro schema as its first argument. You'll need to register the -appropriate Kafka JARs. Here is what an example Pig script looks like: - - REGISTER hadoop-producer_2.8.0-0.8.0.jar; - REGISTER avro-1.4.0.jar; - REGISTER piggybank.jar; - REGISTER kafka-0.8.0.jar; - REGISTER jackson-core-asl-1.5.5.jar; - REGISTER jackson-mapper-asl-1.5.5.jar; - REGISTER scala-library.jar; - - member_info = LOAD 'member_info.tsv' AS (member_id : int, name : chararray); - names = FOREACH member_info GENERATE name; - STORE member_info INTO 'kafka://my-kafka:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); - -That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert -from Pig's data model to the specified Avro schema. - -Further, multi-store is possible with KafkaStorage, so you can easily write to -multiple topics and brokers in the same job: - - SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000; - STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema'); - STORE others INTO 'kafka://my-broker2:9092/others' USING AvroKafkaStorage('$schema'); - -### KafkaOutputFormat ### - -KafkaOutputFormat is a Hadoop OutputFormat for publishing data via Kafka. It -uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e., -BytesWritable). This is a lower-level method of publishing data, as it allows -you to precisely control output. - -Included is an example that publishes some input text line-by-line to a topic. -With KafkaOutputFormat, the key can be a null, where it is ignored by the -producer (random partitioning), or any object for semantic partitioning of the -stream (with an appropriate Kafka partitioner set). Speculative execution is -turned off by the OutputFormat. - -What can I tune? ----------------- - -* kafka.output.queue.bytes: Bytes to queue in memory before pushing to the Kafka - producer (i.e., the batch size). Default is 1,000,000 (1 million) bytes. - -Any of Kafka's producer parameters can be changed by prefixing them with -"kafka.output" in one's job configuration. For example, to change the -compression codec, one would add the "kafka.output.compression.codec" parameter -(e.g., "SET kafka.output.compression.codec 0" in one's Pig script for no -compression). - -For easier debugging, the above values as well as the Kafka broker information -(kafka.metadata.broker.list), the topic (kafka.output.topic), and the schema -(kafka.output.schema) are injected into the job's configuration. By default, -the Hadoop producer uses Kafka's sync producer as asynchronous operation -doesn't make sense in the batch Hadoop case. - http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java deleted file mode 100644 index d447b1d..0000000 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.bridge.examples; - -import java.io.IOException; -import kafka.bridge.hadoop.KafkaOutputFormat; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - -/** - * Publish a text file line by line to a Kafka topic - */ -public class TextPublisher -{ - public static void main(String[] args) throws Exception - { - if (args.length != 2) { - System.err.println("usage: <input path> <kafka output url>"); - return; - } - - Job job = new Job(); - - job.setJarByClass(TextPublisher.class); - job.setInputFormatClass(TextInputFormat.class); - job.setOutputFormatClass(KafkaOutputFormat.class); - - job.setMapperClass(TheMapper.class); - job.setNumReduceTasks(0); - - FileInputFormat.addInputPath(job, new Path(args[0])); - KafkaOutputFormat.setOutputPath(job, new Path(args[1])); - - if (!job.waitForCompletion(true)) { - throw new RuntimeException("Job failed!"); - } - } - - public static class TheMapper extends Mapper<Object, Text, Object, Object> - { - @Override - protected void map(Object key, Text value, Context context) throws IOException, InterruptedException - { - context.write(null, value.getBytes()); - } - } -} - http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java deleted file mode 100644 index 417b4b3..0000000 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.bridge.hadoop; - -import java.io.IOException; -import java.net.URI; -import java.util.*; - -import kafka.common.KafkaException; -import kafka.javaapi.producer.Producer; -import kafka.producer.ProducerConfig; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.log4j.Logger; - -public class KafkaOutputFormat<K, V> extends OutputFormat<K, V> -{ - private Logger log = Logger.getLogger(KafkaOutputFormat.class); - - public static final String KAFKA_URL = "kafka.output.url"; - /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window): - * We set the default to a million bytes so that the server will not reject the batch of messages - * with a MessageSizeTooLargeException. The actual size will be smaller after compression. - */ - public static final int KAFKA_QUEUE_BYTES = 1000000; - - public static final String KAFKA_CONFIG_PREFIX = "kafka.output"; - private static final Map<String, String> kafkaConfigMap; - static { - Map<String, String> cMap = new HashMap<String, String>(); - - // default Hadoop producer configs - cMap.put("producer.type", "sync"); - cMap.put("compression.codec", Integer.toString(1)); - cMap.put("request.required.acks", Integer.toString(1)); - - kafkaConfigMap = Collections.unmodifiableMap(cMap); - } - - public KafkaOutputFormat() - { - super(); - } - - public static void setOutputPath(Job job, Path outputUrl) - { - job.getConfiguration().set(KafkaOutputFormat.KAFKA_URL, outputUrl.toString()); - - job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); - job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); - } - - public static Path getOutputPath(JobContext job) - { - String name = job.getConfiguration().get(KafkaOutputFormat.KAFKA_URL); - return name == null ? null : new Path(name); - } - - @Override - public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException - { - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException - { - // Is there a programmatic way to get the temp dir? I see it hardcoded everywhere in Hadoop, Hive, and Pig. - return new FileOutputCommitter(new Path("/tmp/" + taskAttemptContext.getTaskAttemptID().getJobID().toString()), taskAttemptContext); - } - - @Override - public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException - { - Path outputPath = getOutputPath(context); - if (outputPath == null) - throw new KafkaException("no kafka output url specified"); - URI uri = URI.create(outputPath.toString()); - Configuration job = context.getConfiguration(); - - Properties props = new Properties(); - String topic; - - props.putAll(kafkaConfigMap); // inject default configuration - for (Map.Entry<String, String> m : job) { // handle any overrides - if (!m.getKey().startsWith(KAFKA_CONFIG_PREFIX)) - continue; - if (m.getKey().equals(KAFKA_URL)) - continue; - - String kafkaKeyName = m.getKey().substring(KAFKA_CONFIG_PREFIX.length()+1); - props.setProperty(kafkaKeyName, m.getValue()); // set Kafka producer property - } - - // inject Kafka producer props back into jobconf for easier debugging - for (Map.Entry<Object, Object> m : props.entrySet()) { - job.set(KAFKA_CONFIG_PREFIX + "." + m.getKey().toString(), m.getValue().toString()); - } - - // KafkaOutputFormat specific parameters - final int queueBytes = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.bytes", KAFKA_QUEUE_BYTES); - - if (uri.getScheme().equals("kafka")) { - // using the direct broker list - // URL: kafka://<kafka host>/<topic> - // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar - String brokerList = uri.getAuthority(); - props.setProperty("metadata.broker.list", brokerList); - job.set(KAFKA_CONFIG_PREFIX + ".metadata.broker.list", brokerList); - - if (uri.getPath() == null || uri.getPath().length() <= 1) - throw new KafkaException("no topic specified in kafka uri"); - - topic = uri.getPath().substring(1); // ignore the initial '/' in the path - job.set(KAFKA_CONFIG_PREFIX + ".topic", topic); - log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic)); - } else - throw new KafkaException("missing scheme from kafka uri (must be kafka://)"); - - Producer<Object, byte[]> producer = new Producer<Object, byte[]>(new ProducerConfig(props)); - return new KafkaRecordWriter<K, V>(producer, topic, queueBytes); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java deleted file mode 100644 index 72c088d..0000000 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.bridge.hadoop; - -import java.io.IOException; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -public class KafkaRecordWriter<K,V> extends RecordWriter<K,V> -{ - protected Producer<Object, byte[]> producer; - protected String topic; - - protected List<KeyedMessage<Object, byte[]>> msgList = new LinkedList<KeyedMessage<Object, byte[]>>(); - protected int totalBytes = 0; - protected int queueBytes; - - public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueBytes) - { - this.producer = producer; - this.topic = topic; - this.queueBytes = queueBytes; - } - - protected void sendMsgList() throws IOException - { - if (msgList.size() > 0) { - try { - producer.send(msgList); - } - catch (Exception e) { - throw new IOException(e); // all Kafka exceptions become IOExceptions - } - msgList.clear(); - totalBytes = 0; - } - } - - @Override - public void write(K key, V value) throws IOException, InterruptedException - { - byte[] valBytes; - if (value instanceof byte[]) - valBytes = (byte[]) value; - else if (value instanceof BytesWritable) - // BytesWritable.getBytes returns its internal buffer, so .length would refer to its capacity, not the - // intended size of the byte array contained. We need to use BytesWritable.getLength for the true size. - valBytes = Arrays.copyOf(((BytesWritable) value).getBytes(), ((BytesWritable) value).getLength()); - else - throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish"); - - // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch - // If the new message is going to make the message list tip over 1 million bytes, send the - // message list now. - if ((totalBytes + valBytes.length) > queueBytes || msgList.size() >= Short.MAX_VALUE) - sendMsgList(); - - msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes)); - totalBytes += valBytes.length; - } - - @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException - { - sendMsgList(); - producer.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java deleted file mode 100644 index d24a85a..0000000 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.bridge.pig; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import kafka.bridge.hadoop.KafkaOutputFormat; -import kafka.bridge.hadoop.KafkaRecordWriter; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.Encoder; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.pig.ResourceSchema; -import org.apache.pig.StoreFunc; -import org.apache.pig.data.Tuple; -import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter; -import org.apache.pig.piggybank.storage.avro.PigSchema2Avro; - -public class AvroKafkaStorage extends StoreFunc -{ - protected KafkaRecordWriter<Object, byte[]> writer; - protected org.apache.avro.Schema avroSchema; - protected PigAvroDatumWriter datumWriter; - protected Encoder encoder; - protected ByteArrayOutputStream os; - - public AvroKafkaStorage(String schema) - { - this.avroSchema = org.apache.avro.Schema.parse(schema); - } - - @Override - public OutputFormat getOutputFormat() throws IOException - { - return new KafkaOutputFormat(); - } - - @Override - public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException - { - return location; - } - - @Override - public void setStoreLocation(String uri, Job job) throws IOException - { - KafkaOutputFormat.setOutputPath(job, new Path(uri)); - } - - @Override - @SuppressWarnings("unchecked") - public void prepareToWrite(RecordWriter writer) throws IOException - { - if (this.avroSchema == null) - throw new IllegalStateException("avroSchema shouldn't be null"); - - this.writer = (KafkaRecordWriter) writer; - this.datumWriter = new PigAvroDatumWriter(this.avroSchema); - this.os = new ByteArrayOutputStream(); - this.encoder = new BinaryEncoder(this.os); - } - - @Override - public void cleanupOnFailure(String location, Job job) throws IOException - { - } - - @Override - public void setStoreFuncUDFContextSignature(String signature) - { - } - - @Override - public void checkSchema(ResourceSchema schema) throws IOException - { - this.avroSchema = PigSchema2Avro.validateAndConvert(avroSchema, schema); - } - - protected void writeEnvelope(OutputStream os, Encoder enc) throws IOException - { - } - - @Override - public void putNext(Tuple tuple) throws IOException - { - os.reset(); - writeEnvelope(os, this.encoder); - datumWriter.write(tuple, this.encoder); - this.encoder.flush(); - - try { - this.writer.write(null, this.os.toByteArray()); - } - catch (InterruptedException e) { - throw new IOException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/docs/api.html ---------------------------------------------------------------------- diff --git a/docs/api.html b/docs/api.html index 1787f06..835bdf2 100644 --- a/docs/api.html +++ b/docs/api.html @@ -5,9 +5,9 @@ The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,7 +28,7 @@ As of the 0.8.2 release we encourage all new development to use the new Java pro </dependency> </pre> -Examples showing how to use the producer are given in the +Examples showing how to use the producer are given in the <a href="http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html" title="Kafka 0.8.2 Javadoc">javadocs</a>. <p> @@ -144,17 +144,7 @@ class kafka.javaapi.consumer.SimpleConsumer { For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example" title="Kafka 0.8 SimpleConsumer example">here</a>. -<h3><a id="kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a></h3> -<p> -Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers). -</p> - -<p> -Usage information on the hadoop consumer can be found <a href="https://github.com/linkedin/camus/">here</a>. -</p> - - -<h3><a id="newconsumerapi">2.5 New Consumer API</a></h3> +<h3><a id="newconsumerapi">2.4 New Consumer API</a></h3> As of the 0.9.0 release we have added a replacement for our existing simple and high-level consumers. This client is considered beta quality. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases): <pre> <dependency> @@ -165,4 +155,4 @@ As of the 0.9.0 release we have added a replacement for our existing simple and </pre> Examples showing how to use the producer are given in the -<a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaConsumer.html" title="Kafka 0.9.0 Javadoc">javadocs</a>. \ No newline at end of file +<a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaConsumer.html" title="Kafka 0.9.0 Javadoc">javadocs</a>. http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/docs/documentation.html ---------------------------------------------------------------------- diff --git a/docs/documentation.html b/docs/documentation.html index 795bd1e..a123721 100644 --- a/docs/documentation.html +++ b/docs/documentation.html @@ -35,7 +35,7 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documen <li><a href="#producerapi">2.1 Producer API</a> <li><a href="#highlevelconsumerapi">2.2 High Level Consumer API</a> <li><a href="#simpleconsumerapi">2.3 Simple Consumer API</a> - <li><a href="#kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a> + <li><a href="#newconsumerapi">2.4 New Consumer API</a> </ul> <li><a href="#configuration">3. Configuration</a> <ul> http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/docs/implementation.html ---------------------------------------------------------------------- diff --git a/docs/implementation.html b/docs/implementation.html index d9ffa46..b95d36f 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -5,9 +5,9 @@ The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,10 +20,10 @@ <h4>Producer APIs</h4> <p> -The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>. +The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>. <pre> class Producer<T> { - + /* Sends the data, partitioned by key to the topic using either the */ /* synchronous or the asynchronous producer */ public void send(kafka.javaapi.producer.ProducerData<K,V> producerData); @@ -32,21 +32,21 @@ class Producer<T> { /* the synchronous or the asynchronous producer */ public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData); - /* Closes the producer and cleans up */ + /* Closes the producer and cleans up */ public void close(); } </pre> -The goal is to expose all the producer functionality through a single API to the client. +The goal is to expose all the producer functionality through a single API to the client. The new producer - <ul> -<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data - +<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data - <p><code>kafka.producer.Producer</code> provides the ability to batch multiple produce requests (<code>producer.type=async</code>), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either <code>queue.time</code> or <code>batch.size</code> is reached. A background thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of data and lets the <code>kafka.producer.EventHandler</code> serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the <code>event.handler</code> config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the <code>kafka.producer.async.CallbackHandler</c ode> interface and setting <code>callback.handler</code> config parameter to that class. </p> </li> -<li>handles the serialization of data through a user-specified <code>Encoder</code> - +<li>handles the serialization of data through a user-specified <code>Encoder</code> - <pre> interface Encoder<T> { public Message toMessage(T data); @@ -54,15 +54,15 @@ interface Encoder<T> { </pre> <p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p> </li> -<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> - +<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> - <p> -The routing decision is influenced by the <code>kafka.producer.Partitioner</code>. +The routing decision is influenced by the <code>kafka.producer.Partitioner</code>. <pre> interface Partitioner<T> { int partition(T key, int numPartitions); } </pre> -The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter. +The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter. </p> </li> </ul> @@ -79,11 +79,11 @@ The high-level API hides the details of brokers from the consumer and allows con <h5>Low-level API</h5> <pre> class SimpleConsumer { - - /* Send fetch request to a broker and get back a set of messages. */ + + /* Send fetch request to a broker and get back a set of messages. */ public ByteBufferMessageSet fetch(FetchRequest request); - /* Send a list of fetch requests to a broker and get back a response set. */ + /* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches); /** @@ -97,16 +97,16 @@ class SimpleConsumer { } </pre> -The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state. +The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers which have particular requirements around maintaining state. <h5>High-level API</h5> <pre> -/* create a connection to the cluster */ +/* create a connection to the cluster */ ConsumerConnector connector = Consumer.create(consumerConfig); interface ConsumerConnector { - + /** * This method is used to get a list of KafkaStreams, which are iterators over * MessageAndMetadata objects from which you can obtain messages and their @@ -114,7 +114,7 @@ interface ConsumerConnector { * Input: a map of <topic, #streams> * Output: a map of <topic, list of message streams> */ - public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); + public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); /** * You can also obtain a list of KafkaStreams, that iterate over messages @@ -126,7 +126,7 @@ interface ConsumerConnector { /* Commit the offsets of all messages consumed so far. */ public commitOffsets() - + /* Shut down the connector */ public shutdown() } @@ -149,27 +149,27 @@ Messages consist of a fixed-size header and variable length opaque byte array pa <h3><a id="messageformat">5.4 Message Format</a></h3> <pre> - /** - * A message. The format of an N byte message is the following: - * - * If magic byte is 0 - * - * 1. 1 byte "magic" identifier to allow format changes - * - * 2. 4 byte CRC32 of the payload - * - * 3. N - 5 byte payload - * - * If magic byte is 1 - * - * 1. 1 byte "magic" identifier to allow format changes - * - * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) - * - * 3. 4 byte CRC32 of the payload - * - * 4. N - 6 byte payload - * + /** + * A message. The format of an N byte message is the following: + * + * If magic byte is 0 + * + * 1. 1 byte "magic" identifier to allow format changes + * + * 2. 4 byte CRC32 of the payload + * + * 3. N - 5 byte payload + * + * If magic byte is 1 + * + * 1. 1 byte "magic" identifier to allow format changes + * + * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) + * + * 3. 4 byte CRC32 of the payload + * + * 4. N - 6 byte payload + * */ </pre> </p> @@ -183,7 +183,7 @@ The exact binary format for messages is versioned and maintained as a standard i <pre> On-disk format of a message -message length : 4 bytes (value: 1+4+n) +message length : 4 bytes (value: 1+4+n) "magic" value : 1 byte crc : 4 bytes payload : n bytes @@ -289,7 +289,7 @@ When an element in a path is denoted [xyz], that means that the value of xyz is This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error. </p> <p> -Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available). +Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available). </p> <h4>Broker Topic Registry</h4> <pre> @@ -306,7 +306,7 @@ Consumers of topics also register themselves in ZooKeeper, in order to coordinat </p> <p> -Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. +Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id "foobar". This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to. </p> @@ -371,7 +371,7 @@ The consumer rebalancing algorithms allows all the consumers in a group to come Each consumer does the following during rebalancing: </p> <pre> - 1. For each topic T that C<sub>i</sub> subscribes to + 1. For each topic T that C<sub>i</sub> subscribes to 2. let P<sub>T</sub> be all partitions producing topic T 3. let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> that consume topic T 4. sort P<sub>T</sub> (so partitions on the same broker are clustered together) http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 0875611..3d69fac 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,5 +14,5 @@ // limitations under the License. apply from: file('scala.gradle') -include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', - 'connect:api', 'connect:runtime', 'connect:json', 'connect:file' \ No newline at end of file +include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', + 'connect:api', 'connect:runtime', 'connect:json', 'connect:file'