[2/2] apex-malhar git commit: APEXMALHAR-2242 Additions and updates to the documentation.
APEXMALHAR-2242 Additions and updates to the documentation. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/352e2d92 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/352e2d92 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/352e2d92 Branch: refs/heads/master Commit: 352e2d92cb551c0fa4489f729379c32ad47b08a2 Parents: bd502e7 Author: Thomas WeiseAuthored: Wed Oct 12 20:56:39 2016 -0700 Committer: Thomas Weise Committed: Thu Oct 13 10:24:45 2016 -0700 -- docs/operators/kafkaInputOperator.md | 160 +++--- 1 file changed, 82 insertions(+), 78 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/352e2d92/docs/operators/kafkaInputOperator.md -- diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md index cb29e5d..2886b4b 100644 --- a/docs/operators/kafkaInputOperator.md +++ b/docs/operators/kafkaInputOperator.md @@ -1,19 +1,30 @@ KAFKA INPUT OPERATOR = -### Introduction: About Kafka Input Operator +### Introduction -This is an input operator that consumes data from Kafka messaging system for further processing in Apex. Kafka Input Operator is an fault-tolerant and scalable Malhar Operator. +[Apache Kafka](http://kafka.apache.org) is a pull-based and distributed publish subscribe messaging system, +topics are partitioned and replicated across nodes. -### Why is it needed ? +The Kafka input operator consumes data from the partitions of a Kafka topic for processing in Apex. +The operator has the ability to automatically scale with the Kafka partitioning for high throughput. +It is fault-tolerant (consumer offset checkpointing) and guarantees idempotency to allow exactly-once results in the downstream pipeline. -Kafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across -nodes. Kafka input operator is needed when you want to read data from multiple -partitions of a Kafka topic in parallel in an Apex application. +For more information about the operator design see this [presentation](http://www.slideshare.net/ApacheApex/apache-apex-kafka-input-operator) +and for processing guarantees this [blog](https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/). -### 0.8 Version of Kafka Input Operator +There are two separate implementations of the input operator, +one built against Kafka 0.8 client and a newer version for the +Kafka 0.9 consumer API that also works with MapR Streams. +These reside in different packages and are described separately below. -### AbstractKafkaInputOperator (Package: com.datatorrent.contrib.kafka) +### Kafka Input Operator for Kafka 0.8.x + +Package: `com.datatorrent.contrib.kafka` + +Maven artifact: [malhar-contrib](https://mvnrepository.com/artifact/org.apache.apex/malhar-contrib) + +### AbstractKafkaInputOperator This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesnât have any ports. @@ -77,8 +88,7 @@ Default Value = ONE_TO_ONE Abstract Methods -void emitTuple(Message message): Abstract method that emits tuples -extracted from Kafka message. +`void emitTuple(Message message)`: Abstract method that emits tuples extracted from Kafka message. ### KafkaConsumer @@ -92,8 +102,8 @@ functionality of High Level Consumer API. ### Pre-requisites -This operator referred the Kafka Consumer API of version -0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version of Apache Kafka. +This operator uses the Kafka 0.8.2.1 client consumer API +and will work with 0.8.x and 0.7.x versions of Kafka broker. Configuration Parameters @@ -197,9 +207,9 @@ public interface OffsetManager ``` Abstract Methods                 -Map KafkaPartition, Long loadInitialOffsets(): Specifies the initial offset for consuming messages; called at the activation stage. +`Map loadInitialOffsets()`: Specifies the initial offset for consuming messages; called at the activation stage. -updateOffsets(Map KafkaPartition, Long offsetsOfPartitions):  This +`updateOffsets(Map offsetsOfPartitions)`:  This method is called at every repartitionCheckInterval to update offsets. ### Partitioning @@ -228,25 +238,20 @@ parameter repartitionInterval value to a negative value. ### AbstractSinglePortKafkaInputOperator -This class extends AbstractKafkaInputOperator and having single output -port, will emit the messages through this port. +This class extends AbstractKafkaInputOperator to emit
[1/2] apex-malhar git commit: APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input Operator
Repository: apex-malhar Updated Branches: refs/heads/master 07812a903 -> 352e2d92c APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input Operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bd502e7b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bd502e7b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bd502e7b Branch: refs/heads/master Commit: bd502e7b9f1df2c105d7f9a8044de439463aa299 Parents: 07812a9 Author: chaitanyaAuthored: Tue Sep 27 22:01:50 2016 +0530 Committer: Thomas Weise Committed: Thu Oct 13 08:37:30 2016 -0700 -- docs/operators/kafkaInputOperator.md | 146 +- 1 file changed, 145 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bd502e7b/docs/operators/kafkaInputOperator.md -- diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md index 1d2258e..cb29e5d 100644 --- a/docs/operators/kafkaInputOperator.md +++ b/docs/operators/kafkaInputOperator.md @@ -11,7 +11,9 @@ Kafka is a pull-based and distributed publish subscribe messaging system, topics nodes. Kafka input operator is needed when you want to read data from multiple partitions of a Kafka topic in parallel in an Apex application. -### AbstractKafkaInputOperator +### 0.8 Version of Kafka Input Operator + +### AbstractKafkaInputOperator (Package: com.datatorrent.contrib.kafka) This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesnât have any ports. @@ -280,3 +282,145 @@ Below is the configuration for âtestâ Kafka topic name and localhost:2181 ``` + + +### 0.9 Version of Kafka Input Operator + +### AbstractKafkaInputOperator (Package: org.apache.apex.malhar.kafka) + +This version uses the new 0.9 version of consumer API and features of this version are described here. This operator is fault-tolerant, scalable, multi-cluster and multi-topic support. + + Pre-requisites + +This operator requires version 0.9.0 or later of the Kafka Consumer API. + + Ports +-- + +This abstract class doesn't have any ports. + + Configuration properties + + +- ***clusters*** - String[] +- Mandatory Parameter. +- Specifies the Kafka clusters that you want to consume messages from. To configure multi-cluster support, you need to specify the clusters separated by ";". + +- ***topics*** - String[] +- Mandatory Parameter. +- Specified the Kafka topics that you want to consume messages from. If you want multi-topic support, then specify the topics separated by ",". + +- ***strategy*** - PartitionStrategy +- Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY. + +ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances. +ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances. +Default Value = PartitionStrategy.ONE_TO_ONE. + +- ***initialPartitionCount*** - Integer +- When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. +Default Value = 1. + +- ***repartitionInterval*** - Long +- Interval specified in milliseconds. This value specifies the minimum time required between two repartition actions. +Default Value = 30 Seconds. + +- ***repartitionCheckInterval*** - Long +- Interval specified in milliseconds. This value specifies the minimum interval between two stat checks. +Default Value = 5 Seconds. + +- ***maxTuplesPerWindow*** - Integer +- Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. +Default value = MAX_VALUE + +- ***initialOffset*** - InitialOffset +- Indicates the type of offset i.e, âEARLIEST or LATEST or APPLICATION_OR_EARLIEST or APPLICATION_OR_LATESTâ. +LATEST => Operator consume messages from latest point of Kafka queue. +