mschroederi commented on a change in pull request #113: URL: https://github.com/apache/bahir-flink/pull/113#discussion_r596184664
########## File path: flink-connector-pinot/README.md ########## @@ -0,0 +1,126 @@ +# Flink Pinot Connector + +This connector provides a sink to [Apache Pinot](http://pinot.apache.org/)™. +To use this connector, add the following dependency to your project: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>flink-connector-pinot_2.11</artifactId> + <version>1.1-SNAPSHOT</version> + </dependency> + +*Version Compatibility*: This module is compatible with Pinot 0.6.0. + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. +See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html). + +The sink class is called `PinotSink`. + +## Usage +```java +StreamExecutionEnvironment env = ... +// Checkpointing needs to be enabled when executing in STREAMING mode + env.enableCheckpointing(long interval); + + DataStream<PinotRow> dataStream = ... + PinotSink pinotSink = new PinotSink.Builder<PinotRow>(String pinotControllerHost, String pinotControllerPort, String tableName) + + // Serializes a PinotRow to JSON format + .withJsonSerializer(JsonSerializer<PinotRow> jsonSerializer) + + // Extracts the timestamp from a PinotRow + .withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor) + + // Defines the segment name generation via the predefined SimpleSegmentNameGenerator + // Exemplary segment name: tableName_minTimestamp_maxTimestamp_segmentNamePostfix_0 + .withSimpleSegmentNameGenerator(String tableName, String segmentNamePostfix) + + // Use a custom segment name generator if the SimpleSegmentNameGenerator does not work for your use case + .withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) + + // Use the local filesystem to share committables across subTasks + // CAUTION: Use only if all subTasks run on the same node with access to the local filesystem + .withLocalFileSystemAdapter() + + // Use a custom filesystem adapter. + // CAUTION: Make sure all nodes your Flink app runs on can access the shared filesystem via the provided FileSystemAdapter + .withFileSystemAdapter(FileSystemAdapter fsAdapter) + + // Defines the size of the Pinot segments + .withMaxRowsPerSegment(int maxRowsPerSegment) + + // Prefix within the local filesystem's temp directory used for storing intermediate files + .withTempDirectoryPrefix(String tempDirPrefix) + + // Number of threads used in the `PinotSinkGlobalCommitter` to commit a batch of segments + // Optional - Default is 4 + .withNumCommitThreads(int numCommitThreads) + + // Builds the PinotSink + .build() + dataStream.addSink(pinotSink); +``` + +## Options +| Option | Description | +| ---------------------- | -------------------------------------------------------------------------------- | +| `pinotControllerHost` | Host of the Pinot controller | +| `pinotControllerPort` | Port of the Pinot controller | +| `tableName` | Target Pinot table's name | +| `maxRowsPerSegment` | Maximum number of rows to be stored within a Pinot segment | +| `tempDirPrefix` | Prefix for temp directories used | +| `jsonSerializer` | Serializer used to convert elements to JSON | +| `eventTimeExtractor` | Defines the way event times are extracted from received objects | +| `segmentNameGenerator` | Pinot segment name generator | +| `fsAdapter` | Filesystem adapter used to save files for sharing files across nodes | +| `numCommitThreads` | Number of threads used in the `PinotSinkGlobalCommitter` for committing segments | + +## Architecture Review comment: Totally makes sense 👍 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
