mschroederi commented on a change in pull request #113:
URL: https://github.com/apache/bahir-flink/pull/113#discussion_r598360934



##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.flink.util.Preconditions;
+import org.apache.pinot.client.PinotClientException;
+import org.apache.pinot.client.ResultSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
+ */
+public class PinotSinkTest extends PinotTestBase {
+
+    private static final int MAX_ROWS_PER_SEGMENT = 5;
+    private static final long STREAMING_CHECKPOINTING_INTERVAL = 50;
+    private static final int DATA_CHECKING_TIMEOUT_SECONDS = 60;
+    private static final AtomicBoolean hasFailedOnce = new 
AtomicBoolean(false);
+    private static CountDownLatch latch;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        super.setUp();
+        // Reset hasFailedOnce flag used during failure recovery testing 
before each test.
+        hasFailedOnce.set(false);
+        // Reset latch used to keep the generator streaming source up until 
the test is completed.
+        latch = new CountDownLatch(1);
+    }
+
+    /**
+     * Tests the BATCH execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBatchSink() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(2);
+
+        List<String> rawData = getRawTestData(12);
+        DataStream<SingleColumnTableRow> dataStream = 
setupBatchDataSource(env, rawData);
+        setupSink(dataStream);
+
+        // Run
+        env.execute();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+    }
+
+    /**
+     * Tests failure recovery of the {@link PinotSink} using BATCH execution 
mode.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFailureRecoveryInBatchingSink() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10));
+        env.setParallelism(2);
+
+        List<String> rawData = getRawTestData(12);
+        DataStream<SingleColumnTableRow> dataStream = 
setupBatchDataSource(env, rawData);
+        dataStream = setupFailingMapper(dataStream, 8);
+        setupSink(dataStream);
+
+        // Run
+        env.execute();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+    }
+
+    /**
+     * Tests the STREAMING execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStreamingSink() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(2);
+        env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
+
+        List<String> rawData = getRawTestData(20);
+        DataStream<SingleColumnTableRow> dataStream = 
setupStreamingDataSource(env, rawData);
+        setupSink(dataStream);
+
+        // Start execution of job
+        env.executeAsync();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+
+        // Generator source can now shut down
+        latch.countDown();
+    }
+
+    /**
+     * Tests failure recovery of the {@link PinotSink} using STREAMING 
execution mode.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFailureRecoveryInStreamingSink() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setParallelism(1);
+        env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
+
+        List<String> rawData = getRawTestData(20);
+        DataStream<SingleColumnTableRow> dataStream = 
setupStreamingDataSource(env, rawData);
+        // With a segment size of MAX_ROWS_PER_SEGMENT = 5 elements and a 
parallelism of 2,
+        // the failure will be raised once the first 2 segments were committed 
to Pinot.
+        dataStream = setupFailingMapper(dataStream, 12);
+        setupSink(dataStream);
+
+        // Start execution of job
+        env.executeAsync();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+
+        // Generator source can now shut down
+        latch.countDown();
+    }
+
+    /**
+     * Generates a small test dataset consisting of {@link 
SingleColumnTableRow}s.
+     *
+     * @return List of SingleColumnTableRow
+     */
+    private List<String> getRawTestData(int numItems) {
+        return IntStream.range(1, numItems + 1)
+                .mapToObj(num -> "ColValue" + num)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Setup the data source for STREAMING tests.
+     *
+     * @param env           Stream execution environment
+     * @param rawDataValues Data values to send
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> 
setupStreamingDataSource(StreamExecutionEnvironment env, List<String> 
rawDataValues) {
+        SimpleStreamingSource source = new 
SimpleStreamingSource(rawDataValues, 10);
+        return env.addSource(source)
+                .name("Test input");
+    }
+
+    /**
+     * Setup the data source for BATCH tests.
+     *
+     * @param env           Stream execution environment
+     * @param rawDataValues Data values to send
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> 
setupBatchDataSource(StreamExecutionEnvironment env, List<String> 
rawDataValues) {
+        return env.fromCollection(rawDataValues)
+                .map(value -> new SingleColumnTableRow(value, 
System.currentTimeMillis()))
+                .name("Test input");
+    }
+
+    /**
+     * Setup a mapper that fails when processing the nth element with n = 
failOnceAtNthElement.
+     *
+     * @param dataStream           Input data stream
+     * @param failOnceAtNthElement Number of elements to process before 
raising the exception
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> 
setupFailingMapper(DataStream<SingleColumnTableRow> dataStream, int 
failOnceAtNthElement) {
+        AtomicInteger messageCounter = new AtomicInteger(0);
+
+        return dataStream.map(element -> {
+            if (!hasFailedOnce.get() && messageCounter.incrementAndGet() == 
failOnceAtNthElement) {
+                hasFailedOnce.set(true);
+                // Wait more than STREAMING_CHECKPOINTING_INTERVAL to ensure
+                // that at least one checkpoint was created before raising the 
exception.
+                Thread.sleep(4 * STREAMING_CHECKPOINTING_INTERVAL);

Review comment:
       I've removed the timeout by extending the source to support exception 
raising and added the following comment to explain why we wait for a snapshot 
creation. I hope this clears things up.
   > This allows to check whether the snapshot creation and failure recovery in
   > {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter} 
works properly,
   > respecting the already committed elements and those that are stored in an 
active
   > {@link 
org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment}. 
Committed 
   > elements must not be saved to the snapshot while those in an active 
segment must be saved 
   > to the snapshot in order to enable later-on recovery.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to