fapaul commented on a change in pull request #113: URL: https://github.com/apache/bahir-flink/pull/113#discussion_r596903804
########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.annotation.Internal; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.*; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Helpers to interact with the Pinot controller via its public API. + */ +@Internal +public class PinotControllerHttpClient implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(PinotControllerHttpClient.class); + protected final String controllerHostPort; + protected final CloseableHttpClient httpClient; + + /** + * @param controllerHost Pinot controller's host + * @param controllerPort Pinot controller's port + */ + public PinotControllerHttpClient(String controllerHost, String controllerPort) { + checkNotNull(controllerHost); + checkNotNull(controllerPort); + controllerHostPort = String.format("http://%s:%s", controllerHost, controllerPort); + httpClient = HttpClients.createDefault(); Review comment: `this.` ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java ########## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer; +import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer; +import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter; +import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.tools.admin.command.UploadSegmentCommand; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache Pinot table. The sink + * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code RuntimeExecutionMode.BATCH} + * mode. But ensure to enable checkpointing when using in streaming mode. + * + * <p>We advise you to use the provided {@link PinotSink.Builder} to build and configure the + * PinotSink. All the communication with the Pinot cluster's table is managed via the Pinot + * controller. Thus you need to provide its host and port as well as the target Pinot table. + * The {@link TableConfig} and {@link Schema} is automatically retrieved via the Pinot controller API + * and therefore does not need to be provided. + * + * <p>Whenever an element is received by the sink it gets stored in a {@link PinotWriterSegment}. A + * {@link PinotWriterSegment} represents exactly one segment that will be pushed to the Pinot + * cluster later on. Its size is determined by the customizable {@code maxRowsPerSegment} parameter. + * Please note that the maximum segment size that can be handled by this sink is limited by the + * lower bound of memory available at each subTask. + * Each subTask holds a list of {@link PinotWriterSegment}s of which at most one is active. An + * active {@link PinotWriterSegment} is capable of accepting at least one more element. If a + * {@link PinotWriterSegment} switches from active to inactive it flushes its + * {@code maxRowsPerSegment} elements to disk. The data file is stored in the local filesystem's + * temporary directory and contains serialized elements. We use the {@link JsonSerializer} to + * serialize elements to JSON. + * + * <p>On checkpointing all inactive {@link PinotWriterSegment} are transformed into committables. As + * the data files need to be shared across nodes, the sink requires access to a shared filesystem. We + * use the {@link FileSystemAdapter} for that purpose. A {@link FileSystemAdapter} is capable of + * copying a file from the local to the shared filesystem and vice-versa. + * A {@link PinotSinkCommittable} contains a reference to a data file on the shared file system as + * well as the minimum and maximum timestamp contained in the data file. A timestamp - usually the + * event time - is extracted from each received element via {@link EventTimeExtractor}. The + * timestamps are later on required to follow the guideline for naming Pinot segments. + * + * <p>We use the {@link PinotSinkGlobalCommitter} to collect all created + * {@link PinotSinkCommittable}s, create segments from the referenced data files and finally push them + * to the Pinot table. Therefore, the minimum and maximum timestamp of all + * {@link PinotSinkCommittable} is determined. The segment names are then generated using the + * {@link PinotSinkSegmentNameGenerator} which gets the minimum and maximum timestamp as input. + * The segment generation starts with downloading the referenced data file from the shared file system + * using the provided {@link FileSystemAdapter}. Once this is was completed, we use Pinot's + * {@link SegmentIndexCreationDriver} to generate the final segment. Each segment is thereby stored + * in a temporary directory on the local filesystem. Next, the segment is uploaded to the Pinot + * controller using Pinot's {@link UploadSegmentCommand}. + * + * <p>To ensure that possible failures are handled accordingly each segment name is checked for + * existence within the Pinot cluster before uploading a segment. In case a segment name already + * exists, i.e. if the last commit failed partially with some segments already been uploaded, the + * existing segment is deleted first. When the elements since the last checkpoint are replayed the + * minimum and maximum timestamp of all received elements will be the same. Thus the same set of + * segment names is generated and we can delete previous segments by checking for segment name + * presence. Note: The {@link PinotSinkSegmentNameGenerator} must be deterministic. We also provide + * a {@link SimpleSegmentNameGenerator} which is a simple but for most users suitable segment name + * generator. + * + * <p>Please note that we use the {@link GlobalCommitter} to ensure consistent segment naming. This + * comes with performance limitations as a {@link GlobalCommitter} always runs at a parallelism of 1 + * which results in a clear bottleneck at the {@link PinotSinkGlobalCommitter} that does all the + * computational intensive work (i.e. generating and uploading segments). In order to overcome this + * issue we introduce a custom multithreading approach within the {@link PinotSinkGlobalCommitter} + * to parallelize the segment creation and upload process. + * + * @param <IN> Type of incoming elements + */ +public class PinotSink<IN> implements Sink<IN, PinotSinkCommittable, Void, PinotSinkGlobalCommittable> { + + private final String pinotControllerHost; + private final String pinotControllerPort; + private final String tableName; + private final int maxRowsPerSegment; + private final String tempDirPrefix; + private final JsonSerializer<IN> jsonSerializer; + private final SegmentNameGenerator segmentNameGenerator; + private final FileSystemAdapter fsAdapter; + private final EventTimeExtractor<IN> eventTimeExtractor; + private final int numCommitThreads; + + /** + * @param pinotControllerHost Host of the Pinot controller + * @param pinotControllerPort Port of the Pinot controller + * @param tableName Target table's name + * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment + * @param tempDirPrefix Prefix for temp directories used + * @param jsonSerializer Serializer used to convert elements to JSON + * @param eventTimeExtractor Defines the way event times are extracted from received objects + * @param segmentNameGenerator Pinot segment name generator + * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes + * @param numCommitThreads Number of threads used in the {@link PinotSinkGlobalCommitter} for committing segments + */ + public PinotSink(String pinotControllerHost, String pinotControllerPort, String tableName, + int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer, + EventTimeExtractor<IN> eventTimeExtractor, + SegmentNameGenerator segmentNameGenerator, FileSystemAdapter fsAdapter, + int numCommitThreads) { + this.pinotControllerHost = checkNotNull(pinotControllerHost); + this.pinotControllerPort = checkNotNull(pinotControllerPort); + this.tableName = checkNotNull(tableName); + + checkArgument(maxRowsPerSegment > 0); + this.maxRowsPerSegment = maxRowsPerSegment; + this.tempDirPrefix = checkNotNull(tempDirPrefix); + this.jsonSerializer = checkNotNull(jsonSerializer); + this.eventTimeExtractor = checkNotNull(eventTimeExtractor); + this.segmentNameGenerator = checkNotNull(segmentNameGenerator); + this.fsAdapter = checkNotNull(fsAdapter); + checkArgument(numCommitThreads > 0); + this.numCommitThreads = numCommitThreads; + } + + /** + * Creates a Pinot sink writer. + * + * @param context InitContext + * @param states Empty list as the PinotSinkWriter does not accept states. + */ + @Override + public PinotSinkWriter<IN> createWriter(InitContext context, List<Void> states) { + return new PinotSinkWriter<>( + context.getSubtaskId(), maxRowsPerSegment, eventTimeExtractor, + jsonSerializer, fsAdapter + ); + } + + /** + * The PinotSink does not use a committer. Instead a global committer is used + * + * @return Empty Optional + */ + @Override + public Optional<Committer<PinotSinkCommittable>> createCommitter() { + return Optional.empty(); + } + + /** + * Creates the global committer. + */ + @Override + public Optional<GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable>> createGlobalCommitter() throws IOException { + String timeColumnName = eventTimeExtractor.getTimeColumn(); + TimeUnit segmentTimeUnit = eventTimeExtractor.getSegmentTimeUnit(); + PinotSinkGlobalCommitter committer = new PinotSinkGlobalCommitter( + pinotControllerHost, pinotControllerPort, tableName, segmentNameGenerator, + tempDirPrefix, fsAdapter, timeColumnName, segmentTimeUnit, numCommitThreads + ); + return Optional.of(committer); + } + + /** + * Creates the committables' serializer. + */ + @Override + public Optional<SimpleVersionedSerializer<PinotSinkCommittable>> getCommittableSerializer() { + return Optional.of(new PinotSinkCommittableSerializer()); + } + + /** + * Creates the global committables' serializer. + */ + @Override + public Optional<SimpleVersionedSerializer<PinotSinkGlobalCommittable>> getGlobalCommittableSerializer() { + return Optional.of(new PinotSinkGlobalCommittableSerializer()); + } + + /** + * The PinotSink does not use writer states. + * + * @return Empty Optional + */ + @Override + public Optional<SimpleVersionedSerializer<Void>> getWriterStateSerializer() { + return Optional.empty(); + } + + /** + * Builder for configuring a {@link PinotSink}. This is the recommended public API. + * + * @param <IN> Type of incoming elements + */ + public static class Builder<IN> { + String pinotControllerHost; + String pinotControllerPort; + String tableName; + int maxRowsPerSegment; + String tempDirPrefix = "flink-connector-pinot"; + JsonSerializer<IN> jsonSerializer; + EventTimeExtractor<IN> eventTimeExtractor; + SegmentNameGenerator segmentNameGenerator; + FileSystemAdapter fsAdapter; + int numCommitThreads = 4; Review comment: Nit: Use a constant variable `static final ...` for the magic number `4` and call it something like `DEFAULT_COMMIT_THREADS` ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java ########## @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.committer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.streaming.connectors.pinot.PinotControllerClient; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils; +import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; +import org.apache.pinot.tools.admin.command.UploadSegmentCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}, + * generates segments and pushed them to the Pinot controller. + * Note: We use a custom multithreading approach to parallelize the segment creation and upload to + * overcome the performance limitations resulting from using a {@link GlobalCommitter} always + * running at a parallelism of 1. + */ +@Internal +public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> { + + private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class); + + private final String pinotControllerHost; + private final String pinotControllerPort; + private final String tableName; + private final SegmentNameGenerator segmentNameGenerator; + private final FileSystemAdapter fsAdapter; + private final String timeColumnName; + private final TimeUnit segmentTimeUnit; + private final PinotControllerClient pinotControllerClient; + private final File tempDirectory; + private final Schema tableSchema; + private final TableConfig tableConfig; + private final ExecutorService pool; + + /** + * @param pinotControllerHost Host of the Pinot controller + * @param pinotControllerPort Port of the Pinot controller + * @param tableName Target table's name + * @param segmentNameGenerator Pinot segment name generator + * @param tempDirPrefix Prefix for directory to store temporary files in + * @param fsAdapter Adapter for interacting with the shared file system + * @param timeColumnName Name of the column containing the timestamp + * @param segmentTimeUnit Unit of the time column + * @param numCommitThreads Number of threads used to commit the committables + */ + public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort, + String tableName, SegmentNameGenerator segmentNameGenerator, + String tempDirPrefix, FileSystemAdapter fsAdapter, + String timeColumnName, TimeUnit segmentTimeUnit, + int numCommitThreads) throws IOException { + this.pinotControllerHost = checkNotNull(pinotControllerHost); + this.pinotControllerPort = checkNotNull(pinotControllerPort); + this.tableName = checkNotNull(tableName); + this.segmentNameGenerator = checkNotNull(segmentNameGenerator); + this.fsAdapter = checkNotNull(fsAdapter); + this.timeColumnName = checkNotNull(timeColumnName); + this.segmentTimeUnit = checkNotNull(segmentTimeUnit); + pinotControllerClient = new PinotControllerClient(pinotControllerHost, pinotControllerPort); + + // Create directory that temporary files will be stored in + tempDirectory = Files.createTempDirectory(tempDirPrefix).toFile(); + + // Retrieve the Pinot table schema and the Pinot table config from the Pinot controller + tableSchema = pinotControllerClient.getSchema(tableName); + tableConfig = pinotControllerClient.getTableConfig(tableName); + + // We use a thread pool in order to parallelize the segment creation and segment upload + checkArgument(numCommitThreads > 0); + pool = Executors.newFixedThreadPool(numCommitThreads); + } + + /** + * Identifies global committables that need to be re-committed from a list of recovered committables. + * + * @param globalCommittables List of global committables that are checked for required re-commit + * @return List of global committable that need to be re-committed + * @throws IOException + */ + @Override + public List<PinotSinkGlobalCommittable> filterRecoveredCommittables(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException { + // Holds identified global committables whose commit needs to be retried + List<PinotSinkGlobalCommittable> committablesToRetry = new ArrayList<>(); + + for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) { + CommitStatus commitStatus = getCommitStatus(globalCommittable); + + if (commitStatus.getMissingSegmentNames().isEmpty()) { + // All segments were already committed. Thus, we do not need to retry the commit. + continue; + } + + for (String existingSegment : commitStatus.getExistingSegmentNames()) { + // Some but not all segments were already committed. As we cannot assure the data + // files containing the same data as originally when recovering from failure, + // we delete the already committed segments in order to recommit them later on. + pinotControllerClient.deleteSegment(tableName, existingSegment); + } + committablesToRetry.add(globalCommittable); + } + return committablesToRetry; + } + + /** + * Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable} + * by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s. + * + * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter} + * @return Global committer committable + */ + @Override + public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> committables) { + List<String> dataFilePaths = new ArrayList<>(); + long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; + + // Extract all data file paths and the overall minimum and maximum timestamps + // from all committables + for (PinotSinkCommittable committable : committables) { + dataFilePaths.add(committable.getDataFilePath()); + minTimestamp = Long.min(minTimestamp, committable.getMinTimestamp()); + maxTimestamp = Long.max(maxTimestamp, committable.getMaxTimestamp()); + } + + LOG.debug("Combined {} committables into one global committable", committables.size()); + return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp); + } + + /** + * Copies data files from shared filesystem to the local filesystem, generates segments with names + * according to the segment naming schema and finally pushes the segments to the Pinot cluster. + * Before pushing a segment it is checked whether there already exists a segment with that name + * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted. + * + * @param globalCommittables List of global committables + * @return Global committables whose commit failed + * @throws IOException + */ + @Override + public List<PinotSinkGlobalCommittable> commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException { + // List of failed global committables that can be retried later on + List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>(); + + for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) { + Set<Future<Boolean>> resultFutures = new HashSet<>(); + // Commit all segments in globalCommittable + for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) { + String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId); + // Get segment names with increasing sequenceIds + String segmentName = getSegmentName(globalCommittable, sequenceId); + // Segment committer handling the whole commit process for a single segment + Callable<Boolean> segmentCommitter = new SegmentCommitter( + pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter, + dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName, + segmentTimeUnit + ); + // Submits the segment committer to the thread pool + resultFutures.add(pool.submit(segmentCommitter)); + } + + boolean commitSucceeded = true; + try { + for (Future<Boolean> wasSuccessful : resultFutures) { + // In case any of the segment commits wasn't successful we mark the whole + // globalCommittable as failed + if (!wasSuccessful.get()) { + commitSucceeded = false; + failedCommits.add(globalCommittable); + // Once any of the commits failed, we do not need to check the remaining + // ones, as we try to commit the globalCommittable next time + break; + } + } + } catch (InterruptedException | ExecutionException e) { + // In case of an exception thrown while accessing commit status, mark the whole + // globalCommittable as failed + failedCommits.add(globalCommittable); + LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e); + } + + if (commitSucceeded) { + // If commit succeeded, cleanup the data files stored on the shared file system. In + // case the commit of at least one of the segments failed, nothing will be cleaned + // up here to enable retrying failed commits (data files must therefore stay + // available on the shared filesystem). + for (String path : globalCommittable.getDataFilePaths()) { + fsAdapter.deleteFromSharedFileSystem(path); + } + } + } + + // Return failed commits so that they can be retried later on + return failedCommits; + } + + /** + * Empty method. + */ + @Override + public void endOfInput() { + } + + /** + * Closes the Pinot controller http client, clears the created temporary directory and + * shuts the thread pool down. + */ + @Override + public void close() throws IOException { + pinotControllerClient.close(); + tempDirectory.delete(); + pool.shutdown(); + } + + /** + * Helper method for generating segment names using the segment name generator. + * + * @param globalCommittable Global committable the segment name shall be generated from + * @param sequenceId Incrementing counter + * @return generated segment name + */ + private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) { + return segmentNameGenerator.generateSegmentName(sequenceId, + globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp()); + } + + /** + * Evaluates the status of already uploaded segments by requesting segment metadata from the + * Pinot controller. + * + * @param globalCommittable Global committable whose commit status gets evaluated + * @return Commit status + * @throws IOException + */ + private CommitStatus getCommitStatus(PinotSinkGlobalCommittable globalCommittable) throws IOException { + List<String> existingSegmentNames = new ArrayList<>(); + List<String> missingSegmentNames = new ArrayList<>(); + + // For all segment names that will be used to submit new segments, check whether the segment + // name already exists for the target table + for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) { + String segmentName = getSegmentName(globalCommittable, sequenceId); + if (pinotControllerClient.tableHasSegment(tableName, segmentName)) { + // Segment name already exists + existingSegmentNames.add(segmentName); + } else { + // Segment name does not exist yet + missingSegmentNames.add(segmentName); + } + } + return new CommitStatus(existingSegmentNames, missingSegmentNames); + } + + /** + * Wrapper for existing and missing segments in the Pinot cluster. + */ + private static class CommitStatus { + private final List<String> existingSegmentNames; + private final List<String> missingSegmentNames; + + CommitStatus(List<String> existingSegmentNames, List<String> missingSegmentNames) { + this.existingSegmentNames = existingSegmentNames; + this.missingSegmentNames = missingSegmentNames; + } + + public List<String> getExistingSegmentNames() { + return existingSegmentNames; + } + + public List<String> getMissingSegmentNames() { + return missingSegmentNames; + } + } + + /** + * Helper class for committing a single segment. Downloads a data file from the shared filesystem, + * generates a segment from the data file and uploads segment to the Pinot controller. + */ + private static class SegmentCommitter implements Callable<Boolean> { + + private static final Logger LOG = LoggerFactory.getLogger(SegmentCommitter.class); + + private final String pinotControllerHost; + private final String pinotControllerPort; + private final File tempDirectory; + private final FileSystemAdapter fsAdapter; + private final String dataFilePath; + private final String segmentName; + private final Schema tableSchema; + private final TableConfig tableConfig; + private final String timeColumnName; + private final TimeUnit segmentTimeUnit; + + /** + * @param pinotControllerHost Host of the Pinot controller + * @param pinotControllerPort Port of the Pinot controller + * @param tempDirectory Directory to store temporary files in + * @param fsAdapter Filesystem adapter used to load data files from the shared file system + * @param dataFilePath Data file to load from the shared file system + * @param segmentName Name of the segment to create and commit + * @param tableSchema Pinot table schema + * @param tableConfig Pinot table config + * @param timeColumnName Name of the column containing the timestamp + * @param segmentTimeUnit Unit of the time column + */ + SegmentCommitter(String pinotControllerHost, String pinotControllerPort, + File tempDirectory, FileSystemAdapter fsAdapter, + String dataFilePath, String segmentName, Schema tableSchema, + TableConfig tableConfig, String timeColumnName, + TimeUnit segmentTimeUnit) { + this.pinotControllerHost = pinotControllerHost; + this.pinotControllerPort = pinotControllerPort; + this.tempDirectory = tempDirectory; + this.fsAdapter = fsAdapter; + this.dataFilePath = dataFilePath; + this.segmentName = segmentName; + this.tableSchema = tableSchema; + this.tableConfig = tableConfig; + this.timeColumnName = timeColumnName; + this.segmentTimeUnit = segmentTimeUnit; + } + + /** + * Downloads a segment from the shared file system via {@code fsAdapter}, generates a segment + * and finally uploads the segment to the Pinot controller + * + * @return True if the commit succeeded + */ + @Override + public Boolean call() { + // Local copy of data file stored on the shared filesystem + File segmentData = null; + // File containing the final Pinot segment + File segmentFile = null; + try { + // Download data file from the shared filesystem + LOG.debug("Downloading data file {} from shared file system...", dataFilePath); + List<String> serializedElements = fsAdapter.readFromSharedFileSystem(dataFilePath); + segmentData = FileSystemUtils.writeToLocalFile(serializedElements, tempDirectory); + LOG.debug("Successfully downloaded data file {} from shared file system", dataFilePath); + + segmentFile = FileSystemUtils.createFileInDir(tempDirectory); + LOG.debug("Creating segment in " + segmentFile.getAbsolutePath()); + + // Creates a segment with name `segmentName` in `segmentFile` + generateSegment(segmentData, segmentFile, true); + + // Uploads the recently created segment to the Pinot controller + uploadSegment(segmentFile); + + // Commit successful + return true; + } catch (IOException e) { + LOG.error("Error while committing segment data stored on shared filesystem.", e); + + // Commit failed + return false; + } finally { + // Finally cleanup all files created on the local filesystem + if (segmentData != null) { + segmentData.delete(); + } + if (segmentFile != null) { + segmentFile.delete(); + } + } + } + + /** + * Creates a segment from the given parameters. + * This method was adapted from {@link org.apache.pinot.tools.admin.command.CreateSegmentCommand}. + * + * @param dataFile File containing the JSON data + * @param outDir Segment target path + * @param _postCreationVerification Verify segment after generation + */ + private void generateSegment(File dataFile, File outDir, Boolean _postCreationVerification) { + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, tableSchema); + segmentGeneratorConfig.setSegmentName(segmentName); + segmentGeneratorConfig.setSegmentTimeUnit(segmentTimeUnit); + segmentGeneratorConfig.setTimeColumnName(timeColumnName); + segmentGeneratorConfig.setInputFilePath(dataFile.getPath()); + segmentGeneratorConfig.setFormat(FileFormat.JSON); + segmentGeneratorConfig.setOutDir(outDir.getPath()); + segmentGeneratorConfig.setTableName(tableConfig.getTableName()); + + try { + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig); + driver.build(); + File indexDir = new File(outDir, segmentName); + LOG.debug("Successfully created segment: {} in directory: {}", segmentName, indexDir); + if (_postCreationVerification) { + LOG.debug("Verifying the segment by loading it"); + ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap); + LOG.debug("Successfully loaded segment: {} of size: {} bytes", segmentName, + segment.getSegmentSizeBytes()); + segment.destroy(); + } + } catch (Exception e) { + // SegmentIndexCreationDriverImpl throws generic Exceptions during init and build + // ImmutableSegmentLoader throws generic Exception during load Review comment: Nit: move this comment above the exception. ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java ########## @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.committer; + +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.streaming.connectors.pinot.PinotControllerApi; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; +import org.apache.pinot.tools.admin.command.UploadSegmentCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}, + * generates segments and pushed them to the Pinot controller. + * Note: We use a custom multithreading approach to parallelize the segment creation and upload to + * overcome the performance limitations resulting from using a {@link GlobalCommitter} always + * running at a parallelism of 1. + */ +public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> { + + private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class); + + private final String pinotControllerHost; + private final String pinotControllerPort; + private final String tableName; + private final SegmentNameGenerator segmentNameGenerator; + private final String tempDirPrefix; + private final FileSystemAdapter fsAdapter; + private final String timeColumnName; + private final TimeUnit segmentTimeUnit; + + /** + * @param pinotControllerHost Host of the Pinot controller + * @param pinotControllerPort Port of the Pinot controller + * @param tableName Target table's name + * @param segmentNameGenerator Pinot segment name generator + * @param fsAdapter Adapter for interacting with the shared file system + * @param timeColumnName Name of the column containing the timestamp + * @param segmentTimeUnit Unit of the time column + */ + public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort, String tableName, SegmentNameGenerator segmentNameGenerator, String tempDirPrefix, FileSystemAdapter fsAdapter, String timeColumnName, TimeUnit segmentTimeUnit) { + this.pinotControllerHost = checkNotNull(pinotControllerHost); + this.pinotControllerPort = checkNotNull(pinotControllerPort); + this.tableName = checkNotNull(tableName); + this.segmentNameGenerator = checkNotNull(segmentNameGenerator); + this.tempDirPrefix = checkNotNull(tempDirPrefix); + this.fsAdapter = checkNotNull(fsAdapter); + this.timeColumnName = checkNotNull(timeColumnName); + this.segmentTimeUnit = checkNotNull(segmentTimeUnit); + } + + /** + * Identifies global committables that need to be re-committed from a list of recovered committables. + * + * @param globalCommittables List of global committables that are checked for required re-commit + * @return List of global committable that need to be re-committed + * @throws IOException + */ + @Override + public List<PinotSinkGlobalCommittable> filterRecoveredCommittables(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException { + PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort); + List<PinotSinkGlobalCommittable> committablesToRetry = new ArrayList<>(); + + for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) { + CommitStatus commitStatus = this.getCommitStatus(globalCommittable); + + if (commitStatus.getMissingSegmentNames().isEmpty()) { + // All segments were already committed. Thus, we do not need to retry the commit. + continue; + } + + for (String existingSegment : commitStatus.getExistingSegmentNames()) { + // Some but not all segments were already committed. As we cannot assure the data + // files containing the same data as originally when recovering from failure, + // we delete the already committed segments in order to recommit them later on. + controllerApi.deleteSegment(tableName, existingSegment); + } + committablesToRetry.add(globalCommittable); + } + + return committablesToRetry; + } + + /** + * Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable} + * by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s. + * + * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter} + * @return Global committer committable + */ + @Override + public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> committables) { + List<String> dataFilePaths = new ArrayList<>(); + long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; + + // Extract all data file paths and the overall minimum and maximum timestamps + // from all committables + for (PinotSinkCommittable committable : committables) { + dataFilePaths.add(committable.getDataFilePath()); + minTimestamp = Long.min(minTimestamp, committable.getMinTimestamp()); + maxTimestamp = Long.max(maxTimestamp, committable.getMaxTimestamp()); + } + + LOG.info("Combined {} committables into one global committable", committables.size()); + return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp); + } + + /** + * Copies data files from shared filesystem to the local filesystem, generates segments with names + * according to the segment naming schema and finally pushes the segments to the Pinot cluster. + * Before pushing a segment it is checked whether there already exists a segment with that name + * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted. + * + * @param globalCommittables List of global committables + * @return Global committables whose commit failed + * @throws IOException + */ + @Override + public List<PinotSinkGlobalCommittable> commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException { + // Retrieve the Pinot table schema and the Pinot table config from the Pinot controller + PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort); + Schema tableSchema = controllerApi.getSchema(this.tableName); + TableConfig tableConfig = controllerApi.getTableConfig(this.tableName); + + // List of failed global committables that can be retried later on + List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>(); + + for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) { + // Make sure to remove all previously committed segments in globalCommittable + // when recovering from failure + CommitStatus commitStatus = this.getCommitStatus(globalCommittable); + for (String existingSegment : commitStatus.getExistingSegmentNames()) { + // Some but not all segments were already committed. As we cannot assure the data + // files containing the same data as originally when recovering from failure, + // we delete the already committed segments in order to recommit them later on. + controllerApi.deleteSegment(tableName, existingSegment); + } + + // We use a thread pool in order to parallelize the segment creation and segment upload + ExecutorService pool = Executors.newCachedThreadPool(); + Set<Future<Boolean>> resultFutures = new HashSet<>(); + + // Commit all segments in globalCommittable + int sequenceId = 0; + for (String dataFilePath : globalCommittable.getDataFilePaths()) { + // Get segment names with increasing sequenceIds + String segmentName = this.getSegmentName(globalCommittable, sequenceId++); + // Segment committer handling the whole commit process for a single segment + Callable<Boolean> segmentCommitter = new SegmentCommitter( + this.pinotControllerHost, this.pinotControllerPort, this.tempDirPrefix, + this.fsAdapter, dataFilePath, segmentName, tableSchema, tableConfig, + this.timeColumnName, this.segmentTimeUnit + ); + // Submits the segment committer to the thread pool + resultFutures.add(pool.submit(segmentCommitter)); + } + + try { + for (Future<Boolean> wasSuccessful : resultFutures) { + // In case any of the segment commits wasn't successful we mark the whole + // globalCommittable as failed + if (!wasSuccessful.get()) { + failedCommits.add(globalCommittable); Review comment: Ah, I forgot about it... ########## File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java ########## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.util.TestLogger; +import org.apache.pinot.spi.config.table.*; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * Base class for PinotSink e2e tests + */ +@Testcontainers +public class PinotTestBase extends TestLogger { + + protected static final Logger LOG = LoggerFactory.getLogger(PinotTestBase.class); + + private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0"; + private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000; + private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000; + + protected static final TableConfig TABLE_CONFIG = PinotTableConfig.getTableConfig(); + protected static final String TABLE_NAME = TABLE_CONFIG.getTableName(); + protected static final Schema TABLE_SCHEMA = PinotTableConfig.getTableSchema(); + protected static PinotTestHelper pinotHelper; + + /** + * Creates the Pinot testcontainer. We delay the start of tests until Pinot has started all + * internal components. This is identified through a log statement. + */ + @Container + public GenericContainer<?> pinot = new GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME)) + .withCommand("QuickStart", "-type", "batch") + .withExposedPorts(PINOT_INTERNAL_BROKER_PORT, PINOT_INTERNAL_CONTROLLER_PORT) + .waitingFor( + // Wait for controller, server and broker instances to be available + new HttpWaitStrategy() + .forPort(PINOT_INTERNAL_CONTROLLER_PORT) + .forPath("/instances") + .forStatusCode(200) + .forResponsePredicate(res -> { + try { + JsonNode instances = JsonUtils.stringToJsonNode(res).get("instances"); + // Expect 3 instances to be up and running (controller, broker and server) + return instances.size() == 3; + } catch (IOException e) { + LOG.error("Error while reading json response in HttpWaitStrategy.", e); + } + return false; + }) + // Allow Pinot to take up to 180s for starting up + .withStartupTimeout(Duration.ofSeconds(180)) + ); + + /** + * Creates a new instance of the {@link PinotTestHelper} using the testcontainer port mappings + * and creates the test table. + * + * @throws IOException + */ + @BeforeEach + public void setUp() throws IOException { + pinotHelper = new PinotTestHelper(getPinotHost(), getPinotControllerPort(), getPinotBrokerPort()); + pinotHelper.createTable(TABLE_CONFIG, TABLE_SCHEMA); + } + + /** + * Delete the test table after each test. + * + * @throws Exception + */ + @AfterEach + public void tearDown() throws Exception { + pinotHelper.deleteTable(TABLE_CONFIG, TABLE_SCHEMA); + } + + /** + * Returns the host the Pinot container is available at + * + * @return Pinot container host + */ + protected String getPinotHost() { + return this.pinot.getHost(); + } + + + /** + * Returns the Pinot controller port from the container ports. + * + * @return Pinot controller port + */ + protected String getPinotControllerPort() { + return this.pinot.getMappedPort(PINOT_INTERNAL_CONTROLLER_PORT).toString(); + } + + /** + * Returns the Pinot broker port from the container ports. + * + * @return Pinot broker port + */ + protected String getPinotBrokerPort() { Review comment: private? ########## File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator; +import org.apache.pinot.client.ResultSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * E2e tests for Pinot Sink using BATCH and STREAMING execution mode + */ +public class PinotSinkTest extends PinotTestBase { + + /** + * Tests the BATCH execution of the {@link PinotSink}. + * + * @throws Exception + */ + @Test + public void testBatchSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(2); + + List<SingleColumnTableRow> data = getTestData(12); + this.setupDataStream(env, data); + + // Run + env.execute(); + + checkForDataInPinotWithRetry(data, data.size(), 20); Review comment: Probably the same question again why not all records are present. ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java ########## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer; +import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer; +import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter; +import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.tools.admin.command.UploadSegmentCommand; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache Pinot table. The sink + * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code RuntimeExecutionMode.BATCH} + * mode. But ensure to enable checkpointing when using in streaming mode. + * + * <p>We advise you to use the provided {@link PinotSink.Builder} to build and configure the + * PinotSink. All the communication with the Pinot cluster's table is managed via the Pinot + * controller. Thus you need to provide its host and port as well as the target Pinot table. + * The {@link TableConfig} and {@link Schema} is automatically retrieved via the Pinot controller API + * and therefore does not need to be provided. + * + * <p>Whenever an element is received by the sink it gets stored in a {@link PinotWriterSegment}. A + * {@link PinotWriterSegment} represents exactly one segment that will be pushed to the Pinot + * cluster later on. Its size is determined by the customizable {@code maxRowsPerSegment} parameter. + * Please note that the maximum segment size that can be handled by this sink is limited by the + * lower bound of memory available at each subTask. + * Each subTask holds a list of {@link PinotWriterSegment}s of which at most one is active. An + * active {@link PinotWriterSegment} is capable of accepting at least one more element. If a + * {@link PinotWriterSegment} switches from active to inactive it flushes its + * {@code maxRowsPerSegment} elements to disk. The data file is stored in the local filesystem's + * temporary directory and contains serialized elements. We use the {@link JsonSerializer} to + * serialize elements to JSON. + * + * <p>On checkpointing all inactive {@link PinotWriterSegment} are transformed into committables. As Review comment: You also have to deal with the active segments :) Once this is implemented update the doc string ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java ########## @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.committer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.streaming.connectors.pinot.PinotControllerClient; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils; +import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; +import org.apache.pinot.tools.admin.command.UploadSegmentCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}, + * generates segments and pushed them to the Pinot controller. + * Note: We use a custom multithreading approach to parallelize the segment creation and upload to + * overcome the performance limitations resulting from using a {@link GlobalCommitter} always + * running at a parallelism of 1. + */ +@Internal +public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> { + + private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class); + + private final String pinotControllerHost; + private final String pinotControllerPort; + private final String tableName; + private final SegmentNameGenerator segmentNameGenerator; + private final FileSystemAdapter fsAdapter; + private final String timeColumnName; + private final TimeUnit segmentTimeUnit; + private final PinotControllerClient pinotControllerClient; + private final File tempDirectory; + private final Schema tableSchema; + private final TableConfig tableConfig; + private final ExecutorService pool; + + /** + * @param pinotControllerHost Host of the Pinot controller + * @param pinotControllerPort Port of the Pinot controller + * @param tableName Target table's name + * @param segmentNameGenerator Pinot segment name generator + * @param tempDirPrefix Prefix for directory to store temporary files in + * @param fsAdapter Adapter for interacting with the shared file system + * @param timeColumnName Name of the column containing the timestamp + * @param segmentTimeUnit Unit of the time column + * @param numCommitThreads Number of threads used to commit the committables + */ + public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort, + String tableName, SegmentNameGenerator segmentNameGenerator, + String tempDirPrefix, FileSystemAdapter fsAdapter, + String timeColumnName, TimeUnit segmentTimeUnit, + int numCommitThreads) throws IOException { + this.pinotControllerHost = checkNotNull(pinotControllerHost); + this.pinotControllerPort = checkNotNull(pinotControllerPort); + this.tableName = checkNotNull(tableName); + this.segmentNameGenerator = checkNotNull(segmentNameGenerator); + this.fsAdapter = checkNotNull(fsAdapter); + this.timeColumnName = checkNotNull(timeColumnName); + this.segmentTimeUnit = checkNotNull(segmentTimeUnit); + pinotControllerClient = new PinotControllerClient(pinotControllerHost, pinotControllerPort); + + // Create directory that temporary files will be stored in + tempDirectory = Files.createTempDirectory(tempDirPrefix).toFile(); + + // Retrieve the Pinot table schema and the Pinot table config from the Pinot controller + tableSchema = pinotControllerClient.getSchema(tableName); + tableConfig = pinotControllerClient.getTableConfig(tableName); + + // We use a thread pool in order to parallelize the segment creation and segment upload + checkArgument(numCommitThreads > 0); + pool = Executors.newFixedThreadPool(numCommitThreads); Review comment: `this.` ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/EventTimeExtractor.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.external; + +import org.apache.flink.api.connector.sink.SinkWriter; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * Defines the interface for event time extractors + * + * @param <IN> Type of incoming elements + */ +public abstract class EventTimeExtractor<IN> implements Serializable { Review comment: interface? ########## File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator; +import org.apache.pinot.client.ResultSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * E2e tests for Pinot Sink using BATCH and STREAMING execution mode + */ +public class PinotSinkTest extends PinotTestBase { + + /** + * Tests the BATCH execution of the {@link PinotSink}. + * + * @throws Exception + */ + @Test + public void testBatchSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(2); + + List<SingleColumnTableRow> data = getTestData(12); + this.setupDataStream(env, data); + + // Run + env.execute(); + + checkForDataInPinotWithRetry(data, data.size(), 20); + } + + /** + * Tests the STREAMING execution of the {@link PinotSink}. + * + * @throws Exception + */ + @Test + public void testStreamingSink() throws Exception { + final Configuration conf = new Configuration(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(conf); + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + env.setParallelism(2); + env.enableCheckpointing(50); + + List<SingleColumnTableRow> data = getTestData(1000); + this.setupDataStream(env, data); + + // Run + env.execute(); + + // We only expect the first 100 elements to be already committed to Pinot. + // The remaining would follow once we increase the input data size. + // The stream executions stops once the last input tuple was sent to the sink. + checkForDataInPinotWithRetry(data, 100, 20); Review comment: Why do we not get all records? Since we use `env.execute()` which blocks until the job finishes I would expect all records in pinot. ########## File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java ########## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.util.TestLogger; +import org.apache.pinot.spi.config.table.*; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * Base class for PinotSink e2e tests + */ +@Testcontainers +public class PinotTestBase extends TestLogger { + + protected static final Logger LOG = LoggerFactory.getLogger(PinotTestBase.class); + + private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0"; + private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000; + private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000; + + protected static final TableConfig TABLE_CONFIG = PinotTableConfig.getTableConfig(); + protected static final String TABLE_NAME = TABLE_CONFIG.getTableName(); + protected static final Schema TABLE_SCHEMA = PinotTableConfig.getTableSchema(); + protected static PinotTestHelper pinotHelper; + + /** + * Creates the Pinot testcontainer. We delay the start of tests until Pinot has started all + * internal components. This is identified through a log statement. + */ + @Container + public GenericContainer<?> pinot = new GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME)) + .withCommand("QuickStart", "-type", "batch") + .withExposedPorts(PINOT_INTERNAL_BROKER_PORT, PINOT_INTERNAL_CONTROLLER_PORT) + .waitingFor( + // Wait for controller, server and broker instances to be available + new HttpWaitStrategy() + .forPort(PINOT_INTERNAL_CONTROLLER_PORT) + .forPath("/instances") + .forStatusCode(200) + .forResponsePredicate(res -> { + try { + JsonNode instances = JsonUtils.stringToJsonNode(res).get("instances"); + // Expect 3 instances to be up and running (controller, broker and server) + return instances.size() == 3; + } catch (IOException e) { + LOG.error("Error while reading json response in HttpWaitStrategy.", e); + } + return false; + }) + // Allow Pinot to take up to 180s for starting up + .withStartupTimeout(Duration.ofSeconds(180)) + ); + + /** + * Creates a new instance of the {@link PinotTestHelper} using the testcontainer port mappings + * and creates the test table. + * + * @throws IOException + */ + @BeforeEach + public void setUp() throws IOException { + pinotHelper = new PinotTestHelper(getPinotHost(), getPinotControllerPort(), getPinotBrokerPort()); + pinotHelper.createTable(TABLE_CONFIG, TABLE_SCHEMA); + } + + /** + * Delete the test table after each test. + * + * @throws Exception + */ + @AfterEach + public void tearDown() throws Exception { + pinotHelper.deleteTable(TABLE_CONFIG, TABLE_SCHEMA); + } + + /** + * Returns the host the Pinot container is available at + * + * @return Pinot container host + */ + protected String getPinotHost() { + return this.pinot.getHost(); + } + + + /** + * Returns the Pinot controller port from the container ports. + * + * @return Pinot controller port + */ + protected String getPinotControllerPort() { + return this.pinot.getMappedPort(PINOT_INTERNAL_CONTROLLER_PORT).toString(); + } + + /** + * Returns the Pinot broker port from the container ports. + * + * @return Pinot broker port + */ + protected String getPinotBrokerPort() { + return this.pinot.getMappedPort(PINOT_INTERNAL_BROKER_PORT).toString(); + } + + /** + * Class defining the elements passed to the {@link PinotSink} during the tests. + */ + protected static class SingleColumnTableRow { + + private String _col1; + private Long _timestamp; + + SingleColumnTableRow(@JsonProperty(value = "col1", required = true) String col1, + @JsonProperty(value = "timestamp", required = true) Long timestamp) { + this._col1 = col1; + this._timestamp = timestamp; + } + + @JsonProperty("col1") + public String getCol1() { + return this._col1; + } + + public void setCol1(String _col1) { + this._col1 = _col1; + } + + @JsonProperty("timestamp") + public Long getTimestamp() { + return this._timestamp; + } + + public void setTimestamp(Long timestamp) { + this._timestamp = timestamp; + } + } + + + /** + * EventTimeExtractor for {@link SingleColumnTableRow} used in e2e tests. + * Extracts the timestamp column from {@link SingleColumnTableRow}. + */ + protected static class SingleColumnTableRowEventTimeExtractor extends EventTimeExtractor<SingleColumnTableRow> { Review comment: Move to `PinotSinkTest` because it is only used there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
