mschroederi commented on a change in pull request #113:
URL: https://github.com/apache/bahir-flink/pull/113#discussion_r596184664



##########
File path: flink-connector-pinot/README.md
##########
@@ -0,0 +1,126 @@
+# Flink Pinot Connector
+
+This connector provides a sink to [Apache Pinot](http://pinot.apache.org/)™.  
+To use this connector, add the following dependency to your project:
+
+    <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>flink-connector-pinot_2.11</artifactId>
+      <version>1.1-SNAPSHOT</version>
+    </dependency>
+
+*Version Compatibility*: This module is compatible with Pinot 0.6.0.
+
+Note that the streaming connectors are not part of the binary distribution of 
Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution 
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html).
+
+The sink class is called `PinotSink`.
+
+## Usage
+```java
+StreamExecutionEnvironment env = ...
+// Checkpointing needs to be enabled when executing in STREAMING mode
+        env.enableCheckpointing(long interval);
+
+        DataStream<PinotRow> dataStream = ...
+        PinotSink pinotSink = new PinotSink.Builder<PinotRow>(String 
pinotControllerHost, String pinotControllerPort, String tableName)
+
+        // Serializes a PinotRow to JSON format
+        .withJsonSerializer(JsonSerializer<PinotRow> jsonSerializer)
+
+        // Extracts the timestamp from a PinotRow
+        .withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor)
+
+        // Defines the segment name generation via the predefined 
SimpleSegmentNameGenerator
+        // Exemplary segment name: 
tableName_minTimestamp_maxTimestamp_segmentNamePostfix_0
+        .withSimpleSegmentNameGenerator(String tableName, String 
segmentNamePostfix)
+
+        // Use a custom segment name generator if the 
SimpleSegmentNameGenerator does not work for your use case
+        .withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator)
+
+        // Use the local filesystem to share committables across subTasks
+        // CAUTION: Use only if all subTasks run on the same node with access 
to the local filesystem
+        .withLocalFileSystemAdapter()
+
+        // Use a custom filesystem adapter. 
+        // CAUTION: Make sure all nodes your Flink app runs on can access the 
shared filesystem via the provided FileSystemAdapter
+        .withFileSystemAdapter(FileSystemAdapter fsAdapter)
+
+        // Defines the size of the Pinot segments
+        .withMaxRowsPerSegment(int maxRowsPerSegment)
+
+        // Prefix within the local filesystem's temp directory used for 
storing intermediate files
+        .withTempDirectoryPrefix(String tempDirPrefix)
+        
+        // Number of threads used in the `PinotSinkGlobalCommitter` to commit 
a batch of segments
+        // Optional - Default is 4
+        .withNumCommitThreads(int numCommitThreads)
+
+        // Builds the PinotSink
+        .build()
+        dataStream.addSink(pinotSink);
+```
+
+## Options
+| Option                 | Description                                         
                             |
+| ---------------------- | 
--------------------------------------------------------------------------------
 | 
+| `pinotControllerHost`  | Host of the Pinot controller                        
                             |
+| `pinotControllerPort`  | Port of the Pinot controller                        
                             |
+| `tableName`            | Target Pinot table's name                           
                             |
+| `maxRowsPerSegment`    | Maximum number of rows to be stored within a Pinot 
segment                       |
+| `tempDirPrefix`         | Prefix for temp directories used                   
                               |
+| `jsonSerializer`       | Serializer used to convert elements to JSON         
                             |
+| `eventTimeExtractor`   | Defines the way event times are extracted from 
received objects                   |
+| `segmentNameGenerator` | Pinot segment name generator                        
                             |
+| `fsAdapter`            | Filesystem adapter used to save files for sharing 
files across nodes               |
+| `numCommitThreads`     | Number of threads used in the 
`PinotSinkGlobalCommitter` for committing segments |
+
+## Architecture

Review comment:
       Totally makes sense 👍




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to