fapaul commented on a change in pull request #113:
URL: https://github.com/apache/bahir-flink/pull/113#discussion_r596903804



##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helpers to interact with the Pinot controller via its public API.
+ */
+@Internal
+public class PinotControllerHttpClient implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PinotControllerHttpClient.class);
+    protected final String controllerHostPort;
+    protected final CloseableHttpClient httpClient;
+
+    /**
+     * @param controllerHost Pinot controller's host
+     * @param controllerPort Pinot controller's port
+     */
+    public PinotControllerHttpClient(String controllerHost, String 
controllerPort) {
+        checkNotNull(controllerHost);
+        checkNotNull(controllerPort);
+        controllerHostPort = String.format("http://%s:%s";, controllerHost, 
controllerPort);
+        httpClient = HttpClients.createDefault();

Review comment:
       `this.`

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache 
Pinot table. The sink
+ * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code 
RuntimeExecutionMode.BATCH}
+ * mode. But ensure to enable checkpointing when using in streaming mode.
+ *
+ * <p>We advise you to use the provided {@link PinotSink.Builder} to build and 
configure the
+ * PinotSink. All the communication with the Pinot cluster's table is managed 
via the Pinot
+ * controller. Thus you need to provide its host and port as well as the 
target Pinot table.
+ * The {@link TableConfig} and {@link Schema} is automatically retrieved via 
the Pinot controller API
+ * and therefore does not need to be provided.
+ *
+ * <p>Whenever an element is received by the sink it gets stored in a {@link 
PinotWriterSegment}. A
+ * {@link PinotWriterSegment} represents exactly one segment that will be 
pushed to the Pinot
+ * cluster later on. Its size is determined by the customizable {@code 
maxRowsPerSegment} parameter.
+ * Please note that the maximum segment size that can be handled by this sink 
is limited by the
+ * lower bound of memory available at each subTask.
+ * Each subTask holds a list of {@link PinotWriterSegment}s of which at most 
one is active. An
+ * active {@link PinotWriterSegment} is capable of accepting at least one more 
element. If a
+ * {@link PinotWriterSegment} switches from active to inactive it flushes its
+ * {@code maxRowsPerSegment} elements to disk. The data file is stored in the 
local filesystem's
+ * temporary directory and contains serialized elements. We use the {@link 
JsonSerializer} to
+ * serialize elements to JSON.
+ *
+ * <p>On checkpointing all inactive {@link PinotWriterSegment} are transformed 
into committables. As
+ * the data files need to be shared across nodes, the sink requires access to 
a shared filesystem. We
+ * use the {@link FileSystemAdapter} for that purpose. A {@link 
FileSystemAdapter} is capable of
+ * copying a file from the local to the shared filesystem and vice-versa.
+ * A {@link PinotSinkCommittable} contains a reference to a data file on the 
shared file system as
+ * well as the minimum and maximum timestamp contained in the data file. A 
timestamp - usually the
+ * event time - is extracted from each received element via {@link 
EventTimeExtractor}. The
+ * timestamps are later on required to follow the guideline for naming Pinot 
segments.
+ *
+ * <p>We use the {@link PinotSinkGlobalCommitter} to collect all created
+ * {@link PinotSinkCommittable}s, create segments from the referenced data 
files and finally push them
+ * to the Pinot table. Therefore, the minimum and maximum timestamp of all
+ * {@link PinotSinkCommittable} is determined. The segment names are then 
generated using the
+ * {@link PinotSinkSegmentNameGenerator} which gets the minimum and maximum 
timestamp as input.
+ * The segment generation starts with downloading the referenced data file 
from the shared file system
+ * using the provided {@link FileSystemAdapter}. Once this is was completed, 
we use Pinot's
+ * {@link SegmentIndexCreationDriver} to generate the final segment. Each 
segment is thereby stored
+ * in a temporary directory on the local filesystem. Next, the segment is 
uploaded to the Pinot
+ * controller using Pinot's {@link UploadSegmentCommand}.
+ *
+ * <p>To ensure that possible failures are handled accordingly each segment 
name is checked for
+ * existence within the Pinot cluster before uploading a segment. In case a 
segment name already
+ * exists, i.e. if the last commit failed partially with some segments already 
been uploaded, the
+ * existing segment is deleted first. When the elements since the last 
checkpoint are replayed the
+ * minimum and maximum timestamp of all received elements will be the same. 
Thus the same set of
+ * segment names is generated and we can delete previous segments by checking 
for segment name
+ * presence. Note: The {@link PinotSinkSegmentNameGenerator} must be 
deterministic. We also provide
+ * a {@link SimpleSegmentNameGenerator} which is a simple but for most users 
suitable segment name
+ * generator.
+ *
+ * <p>Please note that we use the {@link GlobalCommitter} to ensure consistent 
segment naming. This
+ * comes with performance limitations as a {@link GlobalCommitter} always runs 
at a parallelism of 1
+ * which results in a clear bottleneck at the {@link PinotSinkGlobalCommitter} 
that does all the
+ * computational intensive work (i.e. generating and uploading segments). In 
order to overcome this
+ * issue we introduce a custom multithreading approach within the {@link 
PinotSinkGlobalCommitter}
+ * to parallelize the segment creation and upload process.
+ *
+ * @param <IN> Type of incoming elements
+ */
+public class PinotSink<IN> implements Sink<IN, PinotSinkCommittable, Void, 
PinotSinkGlobalCommittable> {
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final int maxRowsPerSegment;
+    private final String tempDirPrefix;
+    private final JsonSerializer<IN> jsonSerializer;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final FileSystemAdapter fsAdapter;
+    private final EventTimeExtractor<IN> eventTimeExtractor;
+    private final int numCommitThreads;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param maxRowsPerSegment    Maximum number of rows to be stored within 
a Pinot segment
+     * @param tempDirPrefix        Prefix for temp directories used
+     * @param jsonSerializer       Serializer used to convert elements to JSON
+     * @param eventTimeExtractor   Defines the way event times are extracted 
from received objects
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param fsAdapter            Filesystem adapter used to save files for 
sharing files across nodes
+     * @param numCommitThreads     Number of threads used in the {@link 
PinotSinkGlobalCommitter} for committing segments
+     */
+    public PinotSink(String pinotControllerHost, String pinotControllerPort, 
String tableName,
+                     int maxRowsPerSegment, String tempDirPrefix, 
JsonSerializer<IN> jsonSerializer,
+                     EventTimeExtractor<IN> eventTimeExtractor,
+                     SegmentNameGenerator segmentNameGenerator, 
FileSystemAdapter fsAdapter,
+                     int numCommitThreads) {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+
+        checkArgument(maxRowsPerSegment > 0);
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.jsonSerializer = checkNotNull(jsonSerializer);
+        this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        checkArgument(numCommitThreads > 0);
+        this.numCommitThreads = numCommitThreads;
+    }
+
+    /**
+     * Creates a Pinot sink writer.
+     *
+     * @param context InitContext
+     * @param states  Empty list as the PinotSinkWriter does not accept states.
+     */
+    @Override
+    public PinotSinkWriter<IN> createWriter(InitContext context, List<Void> 
states) {
+        return new PinotSinkWriter<>(
+                context.getSubtaskId(), maxRowsPerSegment, eventTimeExtractor,
+                jsonSerializer, fsAdapter
+        );
+    }
+
+    /**
+     * The PinotSink does not use a committer. Instead a global committer is 
used
+     *
+     * @return Empty Optional
+     */
+    @Override
+    public Optional<Committer<PinotSinkCommittable>> createCommitter() {
+        return Optional.empty();
+    }
+
+    /**
+     * Creates the global committer.
+     */
+    @Override
+    public Optional<GlobalCommitter<PinotSinkCommittable, 
PinotSinkGlobalCommittable>> createGlobalCommitter() throws IOException {
+        String timeColumnName = eventTimeExtractor.getTimeColumn();
+        TimeUnit segmentTimeUnit = eventTimeExtractor.getSegmentTimeUnit();
+        PinotSinkGlobalCommitter committer = new PinotSinkGlobalCommitter(
+                pinotControllerHost, pinotControllerPort, tableName, 
segmentNameGenerator,
+                tempDirPrefix, fsAdapter, timeColumnName, segmentTimeUnit, 
numCommitThreads
+        );
+        return Optional.of(committer);
+    }
+
+    /**
+     * Creates the committables' serializer.
+     */
+    @Override
+    public Optional<SimpleVersionedSerializer<PinotSinkCommittable>> 
getCommittableSerializer() {
+        return Optional.of(new PinotSinkCommittableSerializer());
+    }
+
+    /**
+     * Creates the global committables' serializer.
+     */
+    @Override
+    public Optional<SimpleVersionedSerializer<PinotSinkGlobalCommittable>> 
getGlobalCommittableSerializer() {
+        return Optional.of(new PinotSinkGlobalCommittableSerializer());
+    }
+
+    /**
+     * The PinotSink does not use writer states.
+     *
+     * @return Empty Optional
+     */
+    @Override
+    public Optional<SimpleVersionedSerializer<Void>> 
getWriterStateSerializer() {
+        return Optional.empty();
+    }
+
+    /**
+     * Builder for configuring a {@link PinotSink}. This is the recommended 
public API.
+     *
+     * @param <IN> Type of incoming elements
+     */
+    public static class Builder<IN> {
+        String pinotControllerHost;
+        String pinotControllerPort;
+        String tableName;
+        int maxRowsPerSegment;
+        String tempDirPrefix = "flink-connector-pinot";
+        JsonSerializer<IN> jsonSerializer;
+        EventTimeExtractor<IN> eventTimeExtractor;
+        SegmentNameGenerator segmentNameGenerator;
+        FileSystemAdapter fsAdapter;
+        int numCommitThreads = 4;

Review comment:
       Nit: Use a constant variable `static final ...` for the magic number `4` 
and call it something like `DEFAULT_COMMIT_THREADS`

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.PinotControllerClient;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committer takes committables from {@link 
org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * generates segments and pushed them to the Pinot controller.
+ * Note: We use a custom multithreading approach to parallelize the segment 
creation and upload to
+ * overcome the performance limitations resulting from using a {@link 
GlobalCommitter} always
+ * running at a parallelism of 1.
+ */
+@Internal
+public class PinotSinkGlobalCommitter implements 
GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final FileSystemAdapter fsAdapter;
+    private final String timeColumnName;
+    private final TimeUnit segmentTimeUnit;
+    private final PinotControllerClient pinotControllerClient;
+    private final File tempDirectory;
+    private final Schema tableSchema;
+    private final TableConfig tableConfig;
+    private final ExecutorService pool;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param tempDirPrefix        Prefix for directory to store temporary 
files in
+     * @param fsAdapter            Adapter for interacting with the shared 
file system
+     * @param timeColumnName       Name of the column containing the timestamp
+     * @param segmentTimeUnit      Unit of the time column
+     * @param numCommitThreads     Number of threads used to commit the 
committables
+     */
+    public PinotSinkGlobalCommitter(String pinotControllerHost, String 
pinotControllerPort,
+                                    String tableName, SegmentNameGenerator 
segmentNameGenerator,
+                                    String tempDirPrefix, FileSystemAdapter 
fsAdapter,
+                                    String timeColumnName, TimeUnit 
segmentTimeUnit,
+                                    int numCommitThreads) throws IOException {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.timeColumnName = checkNotNull(timeColumnName);
+        this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
+        pinotControllerClient = new PinotControllerClient(pinotControllerHost, 
pinotControllerPort);
+
+        // Create directory that temporary files will be stored in
+        tempDirectory = Files.createTempDirectory(tempDirPrefix).toFile();
+
+        // Retrieve the Pinot table schema and the Pinot table config from the 
Pinot controller
+        tableSchema = pinotControllerClient.getSchema(tableName);
+        tableConfig = pinotControllerClient.getTableConfig(tableName);
+
+        // We use a thread pool in order to parallelize the segment creation 
and segment upload
+        checkArgument(numCommitThreads > 0);
+        pool = Executors.newFixedThreadPool(numCommitThreads);
+    }
+
+    /**
+     * Identifies global committables that need to be re-committed from a list 
of recovered committables.
+     *
+     * @param globalCommittables List of global committables that are checked 
for required re-commit
+     * @return List of global committable that need to be re-committed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> 
filterRecoveredCommittables(List<PinotSinkGlobalCommittable> 
globalCommittables) throws IOException {
+        // Holds identified global committables whose commit needs to be 
retried
+        List<PinotSinkGlobalCommittable> committablesToRetry = new 
ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : 
globalCommittables) {
+            CommitStatus commitStatus = getCommitStatus(globalCommittable);
+
+            if (commitStatus.getMissingSegmentNames().isEmpty()) {
+                // All segments were already committed. Thus, we do not need 
to retry the commit.
+                continue;
+            }
+
+            for (String existingSegment : 
commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we 
cannot assure the data
+                // files containing the same data as originally when 
recovering from failure,
+                // we delete the already committed segments in order to 
recommit them later on.
+                pinotControllerClient.deleteSegment(tableName, 
existingSegment);
+            }
+            committablesToRetry.add(globalCommittable);
+        }
+        return committablesToRetry;
+    }
+
+    /**
+     * Combines multiple {@link PinotSinkCommittable}s into one {@link 
PinotSinkGlobalCommittable}
+     * by finding the minimum and maximum timestamps from the provided {@link 
PinotSinkCommittable}s.
+     *
+     * @param committables Committables created by {@link 
org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+     * @return Global committer committable
+     */
+    @Override
+    public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> 
committables) {
+        List<String> dataFilePaths = new ArrayList<>();
+        long minTimestamp = Long.MAX_VALUE;
+        long maxTimestamp = Long.MIN_VALUE;
+
+        // Extract all data file paths and the overall minimum and maximum 
timestamps
+        // from all committables
+        for (PinotSinkCommittable committable : committables) {
+            dataFilePaths.add(committable.getDataFilePath());
+            minTimestamp = Long.min(minTimestamp, 
committable.getMinTimestamp());
+            maxTimestamp = Long.max(maxTimestamp, 
committable.getMaxTimestamp());
+        }
+
+        LOG.debug("Combined {} committables into one global committable", 
committables.size());
+        return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, 
maxTimestamp);
+    }
+
+    /**
+     * Copies data files from shared filesystem to the local filesystem, 
generates segments with names
+     * according to the segment naming schema and finally pushes the segments 
to the Pinot cluster.
+     * Before pushing a segment it is checked whether there already exists a 
segment with that name
+     * in the Pinot cluster by calling the Pinot controller. In case there is 
one, it gets deleted.
+     *
+     * @param globalCommittables List of global committables
+     * @return Global committables whose commit failed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> 
commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        // List of failed global committables that can be retried later on
+        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : 
globalCommittables) {
+            Set<Future<Boolean>> resultFutures = new HashSet<>();
+            // Commit all segments in globalCommittable
+            for (int sequenceId = 0; sequenceId < 
globalCommittable.getDataFilePaths().size(); sequenceId++) {
+                String dataFilePath = 
globalCommittable.getDataFilePaths().get(sequenceId);
+                // Get segment names with increasing sequenceIds
+                String segmentName = getSegmentName(globalCommittable, 
sequenceId);
+                // Segment committer handling the whole commit process for a 
single segment
+                Callable<Boolean> segmentCommitter = new SegmentCommitter(
+                        pinotControllerHost, pinotControllerPort, 
tempDirectory, fsAdapter,
+                        dataFilePath, segmentName, tableSchema, tableConfig, 
timeColumnName,
+                        segmentTimeUnit
+                );
+                // Submits the segment committer to the thread pool
+                resultFutures.add(pool.submit(segmentCommitter));
+            }
+
+            boolean commitSucceeded = true;
+            try {
+                for (Future<Boolean> wasSuccessful : resultFutures) {
+                    // In case any of the segment commits wasn't successful we 
mark the whole
+                    // globalCommittable as failed
+                    if (!wasSuccessful.get()) {
+                        commitSucceeded = false;
+                        failedCommits.add(globalCommittable);
+                        // Once any of the commits failed, we do not need to 
check the remaining
+                        // ones, as we try to commit the globalCommittable 
next time
+                        break;
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                // In case of an exception thrown while accessing commit 
status, mark the whole
+                // globalCommittable as failed
+                failedCommits.add(globalCommittable);
+                LOG.error("Accessing a SegmentCommitter thread errored with 
{}", e.getMessage(), e);
+            }
+
+            if (commitSucceeded) {
+                // If commit succeeded, cleanup the data files stored on the 
shared file system. In
+                // case the commit of at least one of the segments failed, 
nothing will be cleaned
+                // up here to enable retrying failed commits (data files must 
therefore stay
+                // available on the shared filesystem).
+                for (String path : globalCommittable.getDataFilePaths()) {
+                    fsAdapter.deleteFromSharedFileSystem(path);
+                }
+            }
+        }
+
+        // Return failed commits so that they can be retried later on
+        return failedCommits;
+    }
+
+    /**
+     * Empty method.
+     */
+    @Override
+    public void endOfInput() {
+    }
+
+    /**
+     * Closes the Pinot controller http client, clears the created temporary 
directory and
+     * shuts the thread pool down.
+     */
+    @Override
+    public void close() throws IOException {
+        pinotControllerClient.close();
+        tempDirectory.delete();
+        pool.shutdown();
+    }
+
+    /**
+     * Helper method for generating segment names using the segment name 
generator.
+     *
+     * @param globalCommittable Global committable the segment name shall be 
generated from
+     * @param sequenceId        Incrementing counter
+     * @return generated segment name
+     */
+    private String getSegmentName(PinotSinkGlobalCommittable 
globalCommittable, int sequenceId) {
+        return segmentNameGenerator.generateSegmentName(sequenceId,
+                globalCommittable.getMinTimestamp(), 
globalCommittable.getMaxTimestamp());
+    }
+
+    /**
+     * Evaluates the status of already uploaded segments by requesting segment 
metadata from the
+     * Pinot controller.
+     *
+     * @param globalCommittable Global committable whose commit status gets 
evaluated
+     * @return Commit status
+     * @throws IOException
+     */
+    private CommitStatus getCommitStatus(PinotSinkGlobalCommittable 
globalCommittable) throws IOException {
+        List<String> existingSegmentNames = new ArrayList<>();
+        List<String> missingSegmentNames = new ArrayList<>();
+
+        // For all segment names that will be used to submit new segments, 
check whether the segment
+        // name already exists for the target table
+        for (int sequenceId = 0; sequenceId < 
globalCommittable.getDataFilePaths().size(); sequenceId++) {
+            String segmentName = getSegmentName(globalCommittable, sequenceId);
+            if (pinotControllerClient.tableHasSegment(tableName, segmentName)) 
{
+                // Segment name already exists
+                existingSegmentNames.add(segmentName);
+            } else {
+                // Segment name does not exist yet
+                missingSegmentNames.add(segmentName);
+            }
+        }
+        return new CommitStatus(existingSegmentNames, missingSegmentNames);
+    }
+
+    /**
+     * Wrapper for existing and missing segments in the Pinot cluster.
+     */
+    private static class CommitStatus {
+        private final List<String> existingSegmentNames;
+        private final List<String> missingSegmentNames;
+
+        CommitStatus(List<String> existingSegmentNames, List<String> 
missingSegmentNames) {
+            this.existingSegmentNames = existingSegmentNames;
+            this.missingSegmentNames = missingSegmentNames;
+        }
+
+        public List<String> getExistingSegmentNames() {
+            return existingSegmentNames;
+        }
+
+        public List<String> getMissingSegmentNames() {
+            return missingSegmentNames;
+        }
+    }
+
+    /**
+     * Helper class for committing a single segment. Downloads a data file 
from the shared filesystem,
+     * generates a segment from the data file and uploads segment to the Pinot 
controller.
+     */
+    private static class SegmentCommitter implements Callable<Boolean> {
+
+        private static final Logger LOG = 
LoggerFactory.getLogger(SegmentCommitter.class);
+
+        private final String pinotControllerHost;
+        private final String pinotControllerPort;
+        private final File tempDirectory;
+        private final FileSystemAdapter fsAdapter;
+        private final String dataFilePath;
+        private final String segmentName;
+        private final Schema tableSchema;
+        private final TableConfig tableConfig;
+        private final String timeColumnName;
+        private final TimeUnit segmentTimeUnit;
+
+        /**
+         * @param pinotControllerHost Host of the Pinot controller
+         * @param pinotControllerPort Port of the Pinot controller
+         * @param tempDirectory       Directory to store temporary files in
+         * @param fsAdapter           Filesystem adapter used to load data 
files from the shared file system
+         * @param dataFilePath        Data file to load from the shared file 
system
+         * @param segmentName         Name of the segment to create and commit
+         * @param tableSchema         Pinot table schema
+         * @param tableConfig         Pinot table config
+         * @param timeColumnName      Name of the column containing the 
timestamp
+         * @param segmentTimeUnit     Unit of the time column
+         */
+        SegmentCommitter(String pinotControllerHost, String 
pinotControllerPort,
+                         File tempDirectory, FileSystemAdapter fsAdapter,
+                         String dataFilePath, String segmentName, Schema 
tableSchema,
+                         TableConfig tableConfig, String timeColumnName,
+                         TimeUnit segmentTimeUnit) {
+            this.pinotControllerHost = pinotControllerHost;
+            this.pinotControllerPort = pinotControllerPort;
+            this.tempDirectory = tempDirectory;
+            this.fsAdapter = fsAdapter;
+            this.dataFilePath = dataFilePath;
+            this.segmentName = segmentName;
+            this.tableSchema = tableSchema;
+            this.tableConfig = tableConfig;
+            this.timeColumnName = timeColumnName;
+            this.segmentTimeUnit = segmentTimeUnit;
+        }
+
+        /**
+         * Downloads a segment from the shared file system via {@code 
fsAdapter}, generates a segment
+         * and finally uploads the segment to the Pinot controller
+         *
+         * @return True if the commit succeeded
+         */
+        @Override
+        public Boolean call() {
+            // Local copy of data file stored on the shared filesystem
+            File segmentData = null;
+            // File containing the final Pinot segment
+            File segmentFile = null;
+            try {
+                // Download data file from the shared filesystem
+                LOG.debug("Downloading data file {} from shared file 
system...", dataFilePath);
+                List<String> serializedElements = 
fsAdapter.readFromSharedFileSystem(dataFilePath);
+                segmentData = 
FileSystemUtils.writeToLocalFile(serializedElements, tempDirectory);
+                LOG.debug("Successfully downloaded data file {} from shared 
file system", dataFilePath);
+
+                segmentFile = FileSystemUtils.createFileInDir(tempDirectory);
+                LOG.debug("Creating segment in " + 
segmentFile.getAbsolutePath());
+
+                // Creates a segment with name `segmentName` in `segmentFile`
+                generateSegment(segmentData, segmentFile, true);
+
+                // Uploads the recently created segment to the Pinot controller
+                uploadSegment(segmentFile);
+
+                // Commit successful
+                return true;
+            } catch (IOException e) {
+                LOG.error("Error while committing segment data stored on 
shared filesystem.", e);
+
+                // Commit failed
+                return false;
+            } finally {
+                // Finally cleanup all files created on the local filesystem
+                if (segmentData != null) {
+                    segmentData.delete();
+                }
+                if (segmentFile != null) {
+                    segmentFile.delete();
+                }
+            }
+        }
+
+        /**
+         * Creates a segment from the given parameters.
+         * This method was adapted from {@link 
org.apache.pinot.tools.admin.command.CreateSegmentCommand}.
+         *
+         * @param dataFile                  File containing the JSON data
+         * @param outDir                    Segment target path
+         * @param _postCreationVerification Verify segment after generation
+         */
+        private void generateSegment(File dataFile, File outDir, Boolean 
_postCreationVerification) {
+            SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, tableSchema);
+            segmentGeneratorConfig.setSegmentName(segmentName);
+            segmentGeneratorConfig.setSegmentTimeUnit(segmentTimeUnit);
+            segmentGeneratorConfig.setTimeColumnName(timeColumnName);
+            segmentGeneratorConfig.setInputFilePath(dataFile.getPath());
+            segmentGeneratorConfig.setFormat(FileFormat.JSON);
+            segmentGeneratorConfig.setOutDir(outDir.getPath());
+            segmentGeneratorConfig.setTableName(tableConfig.getTableName());
+
+            try {
+                SegmentIndexCreationDriver driver = new 
SegmentIndexCreationDriverImpl();
+                driver.init(segmentGeneratorConfig);
+                driver.build();
+                File indexDir = new File(outDir, segmentName);
+                LOG.debug("Successfully created segment: {} in directory: {}", 
segmentName, indexDir);
+                if (_postCreationVerification) {
+                    LOG.debug("Verifying the segment by loading it");
+                    ImmutableSegment segment = 
ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+                    LOG.debug("Successfully loaded segment: {} of size: {} 
bytes", segmentName,
+                            segment.getSegmentSizeBytes());
+                    segment.destroy();
+                }
+            } catch (Exception e) {
+                // SegmentIndexCreationDriverImpl throws generic Exceptions 
during init and build
+                // ImmutableSegmentLoader throws generic Exception during load

Review comment:
       Nit: move this comment above the exception.

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.PinotControllerApi;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committer takes committables from {@link 
org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * generates segments and pushed them to the Pinot controller.
+ * Note: We use a custom multithreading approach to parallelize the segment 
creation and upload to
+ * overcome the performance limitations resulting from using a {@link 
GlobalCommitter} always
+ * running at a parallelism of 1.
+ */
+public class PinotSinkGlobalCommitter implements 
GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final String tempDirPrefix;
+    private final FileSystemAdapter fsAdapter;
+    private final String timeColumnName;
+    private final TimeUnit segmentTimeUnit;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param fsAdapter            Adapter for interacting with the shared 
file system
+     * @param timeColumnName       Name of the column containing the timestamp
+     * @param segmentTimeUnit      Unit of the time column
+     */
+    public PinotSinkGlobalCommitter(String pinotControllerHost, String 
pinotControllerPort, String tableName, SegmentNameGenerator 
segmentNameGenerator, String tempDirPrefix, FileSystemAdapter fsAdapter, String 
timeColumnName, TimeUnit segmentTimeUnit) {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.timeColumnName = checkNotNull(timeColumnName);
+        this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
+    }
+
+    /**
+     * Identifies global committables that need to be re-committed from a list 
of recovered committables.
+     *
+     * @param globalCommittables List of global committables that are checked 
for required re-commit
+     * @return List of global committable that need to be re-committed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> 
filterRecoveredCommittables(List<PinotSinkGlobalCommittable> 
globalCommittables) throws IOException {
+        PinotControllerApi controllerApi = new 
PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+        List<PinotSinkGlobalCommittable> committablesToRetry = new 
ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : 
globalCommittables) {
+            CommitStatus commitStatus = 
this.getCommitStatus(globalCommittable);
+
+            if (commitStatus.getMissingSegmentNames().isEmpty()) {
+                // All segments were already committed. Thus, we do not need 
to retry the commit.
+                continue;
+            }
+
+            for (String existingSegment : 
commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we 
cannot assure the data
+                // files containing the same data as originally when 
recovering from failure,
+                // we delete the already committed segments in order to 
recommit them later on.
+                controllerApi.deleteSegment(tableName, existingSegment);
+            }
+            committablesToRetry.add(globalCommittable);
+        }
+
+        return committablesToRetry;
+    }
+
+    /**
+     * Combines multiple {@link PinotSinkCommittable}s into one {@link 
PinotSinkGlobalCommittable}
+     * by finding the minimum and maximum timestamps from the provided {@link 
PinotSinkCommittable}s.
+     *
+     * @param committables Committables created by {@link 
org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+     * @return Global committer committable
+     */
+    @Override
+    public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> 
committables) {
+        List<String> dataFilePaths = new ArrayList<>();
+        long minTimestamp = Long.MAX_VALUE;
+        long maxTimestamp = Long.MIN_VALUE;
+
+        // Extract all data file paths and the overall minimum and maximum 
timestamps
+        // from all committables
+        for (PinotSinkCommittable committable : committables) {
+            dataFilePaths.add(committable.getDataFilePath());
+            minTimestamp = Long.min(minTimestamp, 
committable.getMinTimestamp());
+            maxTimestamp = Long.max(maxTimestamp, 
committable.getMaxTimestamp());
+        }
+
+        LOG.info("Combined {} committables into one global committable", 
committables.size());
+        return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, 
maxTimestamp);
+    }
+
+    /**
+     * Copies data files from shared filesystem to the local filesystem, 
generates segments with names
+     * according to the segment naming schema and finally pushes the segments 
to the Pinot cluster.
+     * Before pushing a segment it is checked whether there already exists a 
segment with that name
+     * in the Pinot cluster by calling the Pinot controller. In case there is 
one, it gets deleted.
+     *
+     * @param globalCommittables List of global committables
+     * @return Global committables whose commit failed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> 
commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        // Retrieve the Pinot table schema and the Pinot table config from the 
Pinot controller
+        PinotControllerApi controllerApi = new 
PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+        Schema tableSchema = controllerApi.getSchema(this.tableName);
+        TableConfig tableConfig = controllerApi.getTableConfig(this.tableName);
+
+        // List of failed global committables that can be retried later on
+        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : 
globalCommittables) {
+            // Make sure to remove all previously committed segments in 
globalCommittable
+            // when recovering from failure
+            CommitStatus commitStatus = 
this.getCommitStatus(globalCommittable);
+            for (String existingSegment : 
commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we 
cannot assure the data
+                // files containing the same data as originally when 
recovering from failure,
+                // we delete the already committed segments in order to 
recommit them later on.
+                controllerApi.deleteSegment(tableName, existingSegment);
+            }
+
+            // We use a thread pool in order to parallelize the segment 
creation and segment upload
+            ExecutorService pool = Executors.newCachedThreadPool();
+            Set<Future<Boolean>> resultFutures = new HashSet<>();
+
+            // Commit all segments in globalCommittable
+            int sequenceId = 0;
+            for (String dataFilePath : globalCommittable.getDataFilePaths()) {
+                // Get segment names with increasing sequenceIds
+                String segmentName = this.getSegmentName(globalCommittable, 
sequenceId++);
+                // Segment committer handling the whole commit process for a 
single segment
+                Callable<Boolean> segmentCommitter = new SegmentCommitter(
+                        this.pinotControllerHost, this.pinotControllerPort, 
this.tempDirPrefix,
+                        this.fsAdapter, dataFilePath, segmentName, 
tableSchema, tableConfig,
+                        this.timeColumnName, this.segmentTimeUnit
+                );
+                // Submits the segment committer to the thread pool
+                resultFutures.add(pool.submit(segmentCommitter));
+            }
+
+            try {
+                for (Future<Boolean> wasSuccessful : resultFutures) {
+                    // In case any of the segment commits wasn't successful we 
mark the whole
+                    // globalCommittable as failed
+                    if (!wasSuccessful.get()) {
+                        failedCommits.add(globalCommittable);

Review comment:
       Ah, I forgot about it...

##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.util.TestLogger;
+import org.apache.pinot.spi.config.table.*;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for PinotSink e2e tests
+ */
+@Testcontainers
+public class PinotTestBase extends TestLogger {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(PinotTestBase.class);
+
+    private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0";
+    private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000;
+    private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000;
+
+    protected static final TableConfig TABLE_CONFIG = 
PinotTableConfig.getTableConfig();
+    protected static final String TABLE_NAME = TABLE_CONFIG.getTableName();
+    protected static final Schema TABLE_SCHEMA = 
PinotTableConfig.getTableSchema();
+    protected static PinotTestHelper pinotHelper;
+
+    /**
+     * Creates the Pinot testcontainer. We delay the start of tests until 
Pinot has started all
+     * internal components. This is identified through a log statement.
+     */
+    @Container
+    public GenericContainer<?> pinot = new 
GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME))
+            .withCommand("QuickStart", "-type", "batch")
+            .withExposedPorts(PINOT_INTERNAL_BROKER_PORT, 
PINOT_INTERNAL_CONTROLLER_PORT)
+            .waitingFor(
+                    // Wait for controller, server and broker instances to be 
available
+                    new HttpWaitStrategy()
+                            .forPort(PINOT_INTERNAL_CONTROLLER_PORT)
+                            .forPath("/instances")
+                            .forStatusCode(200)
+                            .forResponsePredicate(res -> {
+                                try {
+                                    JsonNode instances = 
JsonUtils.stringToJsonNode(res).get("instances");
+                                    // Expect 3 instances to be up and running 
(controller, broker and server)
+                                    return instances.size() == 3;
+                                } catch (IOException e) {
+                                    LOG.error("Error while reading json 
response in HttpWaitStrategy.", e);
+                                }
+                                return false;
+                            })
+                            // Allow Pinot to take up to 180s for starting up
+                            .withStartupTimeout(Duration.ofSeconds(180))
+            );
+
+    /**
+     * Creates a new instance of the {@link PinotTestHelper} using the 
testcontainer port mappings
+     * and creates the test table.
+     *
+     * @throws IOException
+     */
+    @BeforeEach
+    public void setUp() throws IOException {
+        pinotHelper = new PinotTestHelper(getPinotHost(), 
getPinotControllerPort(), getPinotBrokerPort());
+        pinotHelper.createTable(TABLE_CONFIG, TABLE_SCHEMA);
+    }
+
+    /**
+     * Delete the test table after each test.
+     *
+     * @throws Exception
+     */
+    @AfterEach
+    public void tearDown() throws Exception {
+        pinotHelper.deleteTable(TABLE_CONFIG, TABLE_SCHEMA);
+    }
+
+    /**
+     * Returns the host the Pinot container is available at
+     *
+     * @return Pinot container host
+     */
+    protected String getPinotHost() {
+        return this.pinot.getHost();
+    }
+
+
+    /**
+     * Returns the Pinot controller port from the container ports.
+     *
+     * @return Pinot controller port
+     */
+    protected String getPinotControllerPort() {
+        return 
this.pinot.getMappedPort(PINOT_INTERNAL_CONTROLLER_PORT).toString();
+    }
+
+    /**
+     * Returns the Pinot broker port from the container ports.
+     *
+     * @return Pinot broker port
+     */
+    protected String getPinotBrokerPort() {

Review comment:
       private?

##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.client.ResultSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
+ */
+public class PinotSinkTest extends PinotTestBase {
+
+    /**
+     * Tests the BATCH execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBatchSink() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(2);
+
+        List<SingleColumnTableRow> data = getTestData(12);
+        this.setupDataStream(env, data);
+
+        // Run
+        env.execute();
+
+        checkForDataInPinotWithRetry(data, data.size(), 20);

Review comment:
       Probably the same question again why not all records are present.

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache 
Pinot table. The sink
+ * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code 
RuntimeExecutionMode.BATCH}
+ * mode. But ensure to enable checkpointing when using in streaming mode.
+ *
+ * <p>We advise you to use the provided {@link PinotSink.Builder} to build and 
configure the
+ * PinotSink. All the communication with the Pinot cluster's table is managed 
via the Pinot
+ * controller. Thus you need to provide its host and port as well as the 
target Pinot table.
+ * The {@link TableConfig} and {@link Schema} is automatically retrieved via 
the Pinot controller API
+ * and therefore does not need to be provided.
+ *
+ * <p>Whenever an element is received by the sink it gets stored in a {@link 
PinotWriterSegment}. A
+ * {@link PinotWriterSegment} represents exactly one segment that will be 
pushed to the Pinot
+ * cluster later on. Its size is determined by the customizable {@code 
maxRowsPerSegment} parameter.
+ * Please note that the maximum segment size that can be handled by this sink 
is limited by the
+ * lower bound of memory available at each subTask.
+ * Each subTask holds a list of {@link PinotWriterSegment}s of which at most 
one is active. An
+ * active {@link PinotWriterSegment} is capable of accepting at least one more 
element. If a
+ * {@link PinotWriterSegment} switches from active to inactive it flushes its
+ * {@code maxRowsPerSegment} elements to disk. The data file is stored in the 
local filesystem's
+ * temporary directory and contains serialized elements. We use the {@link 
JsonSerializer} to
+ * serialize elements to JSON.
+ *
+ * <p>On checkpointing all inactive {@link PinotWriterSegment} are transformed 
into committables. As

Review comment:
       You also have to deal with the active segments :) Once this is 
implemented update the doc string

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.PinotControllerClient;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committer takes committables from {@link 
org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * generates segments and pushed them to the Pinot controller.
+ * Note: We use a custom multithreading approach to parallelize the segment 
creation and upload to
+ * overcome the performance limitations resulting from using a {@link 
GlobalCommitter} always
+ * running at a parallelism of 1.
+ */
+@Internal
+public class PinotSinkGlobalCommitter implements 
GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final FileSystemAdapter fsAdapter;
+    private final String timeColumnName;
+    private final TimeUnit segmentTimeUnit;
+    private final PinotControllerClient pinotControllerClient;
+    private final File tempDirectory;
+    private final Schema tableSchema;
+    private final TableConfig tableConfig;
+    private final ExecutorService pool;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param tempDirPrefix        Prefix for directory to store temporary 
files in
+     * @param fsAdapter            Adapter for interacting with the shared 
file system
+     * @param timeColumnName       Name of the column containing the timestamp
+     * @param segmentTimeUnit      Unit of the time column
+     * @param numCommitThreads     Number of threads used to commit the 
committables
+     */
+    public PinotSinkGlobalCommitter(String pinotControllerHost, String 
pinotControllerPort,
+                                    String tableName, SegmentNameGenerator 
segmentNameGenerator,
+                                    String tempDirPrefix, FileSystemAdapter 
fsAdapter,
+                                    String timeColumnName, TimeUnit 
segmentTimeUnit,
+                                    int numCommitThreads) throws IOException {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.timeColumnName = checkNotNull(timeColumnName);
+        this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
+        pinotControllerClient = new PinotControllerClient(pinotControllerHost, 
pinotControllerPort);
+
+        // Create directory that temporary files will be stored in
+        tempDirectory = Files.createTempDirectory(tempDirPrefix).toFile();
+
+        // Retrieve the Pinot table schema and the Pinot table config from the 
Pinot controller
+        tableSchema = pinotControllerClient.getSchema(tableName);
+        tableConfig = pinotControllerClient.getTableConfig(tableName);
+
+        // We use a thread pool in order to parallelize the segment creation 
and segment upload
+        checkArgument(numCommitThreads > 0);
+        pool = Executors.newFixedThreadPool(numCommitThreads);

Review comment:
       `this.`

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/EventTimeExtractor.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.external;
+
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Defines the interface for event time extractors
+ *
+ * @param <IN> Type of incoming elements
+ */
+public abstract class EventTimeExtractor<IN> implements Serializable {

Review comment:
       interface?

##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.client.ResultSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
+ */
+public class PinotSinkTest extends PinotTestBase {
+
+    /**
+     * Tests the BATCH execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBatchSink() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(2);
+
+        List<SingleColumnTableRow> data = getTestData(12);
+        this.setupDataStream(env, data);
+
+        // Run
+        env.execute();
+
+        checkForDataInPinotWithRetry(data, data.size(), 20);
+    }
+
+    /**
+     * Tests the STREAMING execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStreamingSink() throws Exception {
+        final Configuration conf = new Configuration();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(conf);
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setParallelism(2);
+        env.enableCheckpointing(50);
+
+        List<SingleColumnTableRow> data = getTestData(1000);
+        this.setupDataStream(env, data);
+
+        // Run
+        env.execute();
+
+        // We only expect the first 100 elements to be already committed to 
Pinot.
+        // The remaining would follow once we increase the input data size.
+        // The stream executions stops once the last input tuple was sent to 
the sink.
+        checkForDataInPinotWithRetry(data, 100, 20);

Review comment:
       Why do we not get all records? Since we use `env.execute()` which blocks 
until the job finishes I would expect all records in pinot.

##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.util.TestLogger;
+import org.apache.pinot.spi.config.table.*;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for PinotSink e2e tests
+ */
+@Testcontainers
+public class PinotTestBase extends TestLogger {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(PinotTestBase.class);
+
+    private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0";
+    private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000;
+    private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000;
+
+    protected static final TableConfig TABLE_CONFIG = 
PinotTableConfig.getTableConfig();
+    protected static final String TABLE_NAME = TABLE_CONFIG.getTableName();
+    protected static final Schema TABLE_SCHEMA = 
PinotTableConfig.getTableSchema();
+    protected static PinotTestHelper pinotHelper;
+
+    /**
+     * Creates the Pinot testcontainer. We delay the start of tests until 
Pinot has started all
+     * internal components. This is identified through a log statement.
+     */
+    @Container
+    public GenericContainer<?> pinot = new 
GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME))
+            .withCommand("QuickStart", "-type", "batch")
+            .withExposedPorts(PINOT_INTERNAL_BROKER_PORT, 
PINOT_INTERNAL_CONTROLLER_PORT)
+            .waitingFor(
+                    // Wait for controller, server and broker instances to be 
available
+                    new HttpWaitStrategy()
+                            .forPort(PINOT_INTERNAL_CONTROLLER_PORT)
+                            .forPath("/instances")
+                            .forStatusCode(200)
+                            .forResponsePredicate(res -> {
+                                try {
+                                    JsonNode instances = 
JsonUtils.stringToJsonNode(res).get("instances");
+                                    // Expect 3 instances to be up and running 
(controller, broker and server)
+                                    return instances.size() == 3;
+                                } catch (IOException e) {
+                                    LOG.error("Error while reading json 
response in HttpWaitStrategy.", e);
+                                }
+                                return false;
+                            })
+                            // Allow Pinot to take up to 180s for starting up
+                            .withStartupTimeout(Duration.ofSeconds(180))
+            );
+
+    /**
+     * Creates a new instance of the {@link PinotTestHelper} using the 
testcontainer port mappings
+     * and creates the test table.
+     *
+     * @throws IOException
+     */
+    @BeforeEach
+    public void setUp() throws IOException {
+        pinotHelper = new PinotTestHelper(getPinotHost(), 
getPinotControllerPort(), getPinotBrokerPort());
+        pinotHelper.createTable(TABLE_CONFIG, TABLE_SCHEMA);
+    }
+
+    /**
+     * Delete the test table after each test.
+     *
+     * @throws Exception
+     */
+    @AfterEach
+    public void tearDown() throws Exception {
+        pinotHelper.deleteTable(TABLE_CONFIG, TABLE_SCHEMA);
+    }
+
+    /**
+     * Returns the host the Pinot container is available at
+     *
+     * @return Pinot container host
+     */
+    protected String getPinotHost() {
+        return this.pinot.getHost();
+    }
+
+
+    /**
+     * Returns the Pinot controller port from the container ports.
+     *
+     * @return Pinot controller port
+     */
+    protected String getPinotControllerPort() {
+        return 
this.pinot.getMappedPort(PINOT_INTERNAL_CONTROLLER_PORT).toString();
+    }
+
+    /**
+     * Returns the Pinot broker port from the container ports.
+     *
+     * @return Pinot broker port
+     */
+    protected String getPinotBrokerPort() {
+        return this.pinot.getMappedPort(PINOT_INTERNAL_BROKER_PORT).toString();
+    }
+
+    /**
+     * Class defining the elements passed to the {@link PinotSink} during the 
tests.
+     */
+    protected static class SingleColumnTableRow {
+
+        private String _col1;
+        private Long _timestamp;
+
+        SingleColumnTableRow(@JsonProperty(value = "col1", required = true) 
String col1,
+                             @JsonProperty(value = "timestamp", required = 
true) Long timestamp) {
+            this._col1 = col1;
+            this._timestamp = timestamp;
+        }
+
+        @JsonProperty("col1")
+        public String getCol1() {
+            return this._col1;
+        }
+
+        public void setCol1(String _col1) {
+            this._col1 = _col1;
+        }
+
+        @JsonProperty("timestamp")
+        public Long getTimestamp() {
+            return this._timestamp;
+        }
+
+        public void setTimestamp(Long timestamp) {
+            this._timestamp = timestamp;
+        }
+    }
+
+
+    /**
+     * EventTimeExtractor for {@link SingleColumnTableRow} used in e2e tests.
+     * Extracts the timestamp column from {@link SingleColumnTableRow}.
+     */
+    protected static class SingleColumnTableRowEventTimeExtractor extends 
EventTimeExtractor<SingleColumnTableRow> {

Review comment:
       Move to `PinotSinkTest` because it is only used there?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to