APEXMALHAR-2242 Additions and updates to the documentation.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/352e2d92 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/352e2d92 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/352e2d92 Branch: refs/heads/master Commit: 352e2d92cb551c0fa4489f729379c32ad47b08a2 Parents: bd502e7 Author: Thomas Weise <t...@apache.org> Authored: Wed Oct 12 20:56:39 2016 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Thu Oct 13 10:24:45 2016 -0700 ---------------------------------------------------------------------- docs/operators/kafkaInputOperator.md | 160 +++++++++++++++--------------- 1 file changed, 82 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/352e2d92/docs/operators/kafkaInputOperator.md ---------------------------------------------------------------------- diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md index cb29e5d..2886b4b 100644 --- a/docs/operators/kafkaInputOperator.md +++ b/docs/operators/kafkaInputOperator.md @@ -1,19 +1,30 @@ KAFKA INPUT OPERATOR ===================== -### Introduction: About Kafka Input Operator +### Introduction -This is an input operator that consumes data from Kafka messaging system for further processing in Apex. Kafka Input Operator is an fault-tolerant and scalable Malhar Operator. +[Apache Kafka](http://kafka.apache.org) is a pull-based and distributed publish subscribe messaging system, +topics are partitioned and replicated across nodes. -### Why is it needed ? +The Kafka input operator consumes data from the partitions of a Kafka topic for processing in Apex. +The operator has the ability to automatically scale with the Kafka partitioning for high throughput. +It is fault-tolerant (consumer offset checkpointing) and guarantees idempotency to allow exactly-once results in the downstream pipeline. -Kafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across -nodes. Kafka input operator is needed when you want to read data from multiple -partitions of a Kafka topic in parallel in an Apex application. +For more information about the operator design see this [presentation](http://www.slideshare.net/ApacheApex/apache-apex-kafka-input-operator) +and for processing guarantees this [blog](https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/). -### 0.8 Version of Kafka Input Operator +There are two separate implementations of the input operator, +one built against Kafka 0.8 client and a newer version for the +Kafka 0.9 consumer API that also works with MapR Streams. +These reside in different packages and are described separately below. -### AbstractKafkaInputOperator (Package: com.datatorrent.contrib.kafka) +### Kafka Input Operator for Kafka 0.8.x + +Package: `com.datatorrent.contrib.kafka` + +Maven artifact: [malhar-contrib](https://mvnrepository.com/artifact/org.apache.apex/malhar-contrib) + +### AbstractKafkaInputOperator This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesnât have any ports. @@ -77,8 +88,7 @@ Default Value = ONE_TO_ONE</p></td> #### Abstract Methods -void emitTuple(Message message): Abstract method that emits tuples -extracted from Kafka message. +`void emitTuple(Message message)`: Abstract method that emits tuples extracted from Kafka message. ### KafkaConsumer @@ -92,8 +102,8 @@ functionality of High Level Consumer API. ### Pre-requisites -This operator referred the Kafka Consumer API of version -0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version of Apache Kafka. +This operator uses the Kafka 0.8.2.1 client consumer API +and will work with 0.8.x and 0.7.x versions of Kafka broker. #### Configuration Parameters @@ -197,9 +207,9 @@ public interface OffsetManager ``` #### Abstract Methods                 -Map <KafkaPartition, Long> loadInitialOffsets(): Specifies the initial offset for consuming messages; called at the activation stage. +`Map <KafkaPartition, Long> loadInitialOffsets()`: Specifies the initial offset for consuming messages; called at the activation stage. -updateOffsets(Map <KafkaPartition, Long> offsetsOfPartitions):  This +`updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions)`:  This method is called at every repartitionCheckInterval to update offsets. ### Partitioning @@ -228,25 +238,20 @@ parameter repartitionInterval value to a negative value. ### AbstractSinglePortKafkaInputOperator -This class extends AbstractKafkaInputOperator and having single output -port, will emit the messages through this port. +This class extends AbstractKafkaInputOperator to emit messages through single output port. #### Ports -outputPort <T>: Tuples extracted from Kafka messages are emitted through -this port. +`outputPort <T>`: Tuples extracted from Kafka messages are emitted through this port. #### Abstract Methods -T getTuple(Message msg) : Converts the Kafka message to tuple. +`T getTuple(Message msg)`: Converts the Kafka message to tuple. ### Concrete Classes -1. KafkaSinglePortStringInputOperator : -This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts string from Kafka message. - -2. KafkaSinglePortByteArrayInputOperator: -This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts byte array from Kafka message. +1. KafkaSinglePortStringInputOperator: extends `AbstractSinglePortKafkaInputOperator`, extracts string from Kafka message. +2. KafkaSinglePortByteArrayInputOperator: extends `AbstractSinglePortKafkaInputOperator`, extracts byte array from Kafka message. ### Application Example @@ -257,15 +262,13 @@ Below is the code snippet: @ApplicationAnnotation(name = "KafkaApp") public class ExampleKafkaApplication implements StreamingApplication { -@Override -public void populateDAG(DAG dag, Configuration entries) -{ - KafkaSinglePortByteArrayInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator()); - - ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); - - dag.addStream("MessageData", input.outputPort, output.input); -} + @Override + public void populateDAG(DAG dag, Configuration entries) + { + KafkaSinglePortByteArrayInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator()); + ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); + dag.addStream("MessageData", input.outputPort, output.input); + } } ``` Below is the configuration for âtestâ Kafka topic name and @@ -273,27 +276,32 @@ Below is the configuration for âtestâ Kafka topic name and ```xml <property> -<name>dt.operator.MessageReader.prop.topic</name> -<value>test</value> + <name>dt.operator.MessageReader.prop.topic</name> + <value>test</value> </property> <property> -<name>dt.operator.KafkaInputOperator.prop.zookeeper</nam> -<value>localhost:2181</value> + <name>dt.operator.KafkaInputOperator.prop.zookeeper</nam> + <value>localhost:2181</value> </property> ``` -### 0.9 Version of Kafka Input Operator +### Kafka Input Operator for Kafka 0.9.x + +Package: `org.apache.apex.malhar.kafka` -### AbstractKafkaInputOperator (Package: org.apache.apex.malhar.kafka) +Maven Artifact: [malhar-kafka](https://mvnrepository.com/artifact/org.apache.apex/malhar-kafka) -This version uses the new 0.9 version of consumer API and features of this version are described here. This operator is fault-tolerant, scalable, multi-cluster and multi-topic support. +This version uses the new 0.9 version of consumer API and works with Kafka broker version 0.9 and later. +The operator is fault-tolerant, scalable and supports input from multiple clusters and multiple topics in a single operator instance. #### Pre-requisites This operator requires version 0.9.0 or later of the Kafka Consumer API. +### AbstractKafkaInputOperator + #### Ports ---------- @@ -311,11 +319,11 @@ This abstract class doesn't have any ports. - Specified the Kafka topics that you want to consume messages from. If you want multi-topic support, then specify the topics separated by ",". - ***strategy*** - PartitionStrategy - - Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY. + - Operator supports two types of partitioning strategies, `ONE_TO_ONE` and `ONE_TO_MANY`. - ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances. - ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances. - Default Value = PartitionStrategy.ONE_TO_ONE. + `ONE_TO_ONE`: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances. + `ONE_TO_MANY`: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances. + Default Value = `PartitionStrategy.ONE_TO_ONE`. - ***initialPartitionCount*** - Integer - When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. @@ -331,23 +339,22 @@ This abstract class doesn't have any ports. - ***maxTuplesPerWindow*** - Integer - Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. - Default value = MAX_VALUE + Default value = `MAX_VALUE` - ***initialOffset*** - InitialOffset - - Indicates the type of offset i.e, âEARLIEST or LATEST or APPLICATION_OR_EARLIEST or APPLICATION_OR_LATESTâ. - LATEST => Operator consume messages from latest point of Kafka queue. - EARLIEST => Operator consume messages starting from message queue. - APPLICATION_OR_EARLIEST => Operator consume messages from committed position from last run. If there is no committed offset, then it starts consuming from beginning of kafka queue. - APPLICATION_OR_LATEST => Operator consume messages from committed position from last run. If there is not committed offset, then it starts consuming from latest position of queue. - Default value = InitialOffset.APPLICATION_OR_LATEST + - Indicates the type of offset i.e, `EARLIEST` or `LATEST` or `APPLICATION_OR_EARLIEST` or `APPLICATION_OR_LATEST`. + `LATEST` => Consume new messages from latest offset in the topic. + `EARLIEST` => Consume all messages available in the topic. + `APPLICATION_OR_EARLIEST` => Consume messages from committed position from last run. If there is no committed offset, then start consuming from beginning. + `APPLICATION_OR_LATEST` => Consumes messages from committed position from last run. If a committed offset is unavailable, then start consuming from latest position. + Default value = `InitialOffset.APPLICATION_OR_LATEST` - ***metricsRefreshInterval*** - Long - Interval specified in milliseconds. This value specifies the minimum interval between two metric stat updates. Default value = 5 Seconds. - ***consumerTimeout*** - Long - - Indicates the time waiting in poll data if data is not available. Please refer the below link: - http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll + - Indicates the [time waiting in poll](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll) when data is not available. Default value = 5 Seconds. - ***holdingBufferSize*** - Long @@ -355,25 +362,24 @@ This abstract class doesn't have any ports. Default value = 1024. - ***consumerProps*** - Properties - - Specify the consumer properties which are not yet set to the operator. Please refer the below link for consumer properties: - http://kafka.apache.org/090/documentation.html#newconsumerconfigs + - Specify the [consumer properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) which are not yet set to the operator. - ***windowDataManager*** - WindowDataManager - - Specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. - Default value = WindowDataManager.NoopWindowDataManager. + - If set to a value other than the default, such as `FSWindowDataManager`, specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. + Default value = `WindowDataManager.NoopWindowDataManager`. #### Abstract Methods -void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message): Abstract method that emits tuples +`void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)`: Abstract method that emits tuples extracted from Kafka message. ### Concrete Classes #### KafkaSinglePortInputOperator -This class extends from AbstractKafkaInputOperator and define the getTuple() method which extracts byte array from Kafka message. +This class extends from AbstractKafkaInputOperator and defines the `getTuple()` method which extracts byte array from Kafka message. #### Ports -outputPort <byte[]>: Tuples extracted from Kafka messages are emitted through this port. +`outputPort <byte[]>`: Tuples extracted from Kafka messages are emitted through this port. ### Application Example This section builds an Apex application using Kafka input operator. @@ -383,15 +389,13 @@ Below is the code snippet: @ApplicationAnnotation(name = "KafkaApp") public class ExampleKafkaApplication implements StreamingApplication { -@Override -public void populateDAG(DAG dag, Configuration entries) -{ - KafkaSinglePortInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortInputOperator()); - - ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); - - dag.addStream("MessageData", input.outputPort, output.input); -} + @Override + public void populateDAG(DAG dag, Configuration entries) + { + KafkaSinglePortInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortInputOperator()); + ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); + dag.addStream("MessageData", input.outputPort, output.input); + } } ``` Below is the configuration for âtestâ Kafka topic name and @@ -399,13 +403,13 @@ Below is the configuration for âtestâ Kafka topic name and ```xml <property> -<name>dt.operator.MessageReader.prop.topics</name> -<value>test</value> + <name>dt.operator.MessageReader.prop.topics</name> + <value>test</value> </property> <property> -<name>dt.operator.KafkaInputOperator.prop.clusters</nam> -<value>localhost:9092</value> + <name>dt.operator.KafkaInputOperator.prop.clusters</nam> + <value>localhost:9092</value> </property> ``` @@ -413,14 +417,14 @@ By adding following lines to properties file, Kafka Input Operator supports mult ```xml <property> -<name>dt.operator.MessageReader.prop.topics</name> -<value>test1, test2</value> + <name>dt.operator.MessageReader.prop.topics</name> + <value>test1, test2</value> </property> <property> -<name>dt.operator.KafkaInputOperator.prop.clusters</nam> -<value>localhost:9092; localhost:9093; localhost:9094</value> + <name>dt.operator.KafkaInputOperator.prop.clusters</nam> + <value>localhost:9092; localhost:9093; localhost:9094</value> </property> ``` -For more details about example application, Please refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka. +For a full example application project, refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka