fapaul commented on a change in pull request #113: URL: https://github.com/apache/bahir-flink/pull/113#discussion_r595812015
########## File path: flink-connector-pinot/README.md ########## @@ -0,0 +1,126 @@ +# Flink Pinot Connector + +This connector provides a sink to [Apache Pinot](http://pinot.apache.org/)™. +To use this connector, add the following dependency to your project: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>flink-connector-pinot_2.11</artifactId> + <version>1.1-SNAPSHOT</version> + </dependency> + +*Version Compatibility*: This module is compatible with Pinot 0.6.0. + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. +See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html). + +The sink class is called `PinotSink`. + +## Usage +```java +StreamExecutionEnvironment env = ... +// Checkpointing needs to be enabled when executing in STREAMING mode + env.enableCheckpointing(long interval); + + DataStream<PinotRow> dataStream = ... + PinotSink pinotSink = new PinotSink.Builder<PinotRow>(String pinotControllerHost, String pinotControllerPort, String tableName) + + // Serializes a PinotRow to JSON format + .withJsonSerializer(JsonSerializer<PinotRow> jsonSerializer) + + // Extracts the timestamp from a PinotRow + .withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor) + + // Defines the segment name generation via the predefined SimpleSegmentNameGenerator + // Exemplary segment name: tableName_minTimestamp_maxTimestamp_segmentNamePostfix_0 + .withSimpleSegmentNameGenerator(String tableName, String segmentNamePostfix) + + // Use a custom segment name generator if the SimpleSegmentNameGenerator does not work for your use case + .withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) + + // Use the local filesystem to share committables across subTasks + // CAUTION: Use only if all subTasks run on the same node with access to the local filesystem + .withLocalFileSystemAdapter() + + // Use a custom filesystem adapter. + // CAUTION: Make sure all nodes your Flink app runs on can access the shared filesystem via the provided FileSystemAdapter + .withFileSystemAdapter(FileSystemAdapter fsAdapter) + + // Defines the size of the Pinot segments + .withMaxRowsPerSegment(int maxRowsPerSegment) + + // Prefix within the local filesystem's temp directory used for storing intermediate files + .withTempDirectoryPrefix(String tempDirPrefix) + + // Number of threads used in the `PinotSinkGlobalCommitter` to commit a batch of segments + // Optional - Default is 4 + .withNumCommitThreads(int numCommitThreads) + + // Builds the PinotSink + .build() + dataStream.addSink(pinotSink); +``` + +## Options +| Option | Description | +| ---------------------- | -------------------------------------------------------------------------------- | +| `pinotControllerHost` | Host of the Pinot controller | +| `pinotControllerPort` | Port of the Pinot controller | +| `tableName` | Target Pinot table's name | +| `maxRowsPerSegment` | Maximum number of rows to be stored within a Pinot segment | +| `tempDirPrefix` | Prefix for temp directories used | +| `jsonSerializer` | Serializer used to convert elements to JSON | +| `eventTimeExtractor` | Defines the way event times are extracted from received objects | +| `segmentNameGenerator` | Pinot segment name generator | +| `fsAdapter` | Filesystem adapter used to save files for sharing files across nodes | +| `numCommitThreads` | Number of threads used in the `PinotSinkGlobalCommitter` for committing segments | + +## Architecture Review comment: I would reorder this section a bit. 1. Architecture 2. PinotSinkWriter 3. PinotGlobalCommitter 4. DeliveryGuarantees 5. Options 6. Usage ########## File path: flink-connector-pinot/pom.xml ########## @@ -0,0 +1,225 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink-parent_2.11</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-pinot_2.11</artifactId> + <name>flink-connector-pinot</name> + + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <pinot.version>0.6.0</pinot.version> + + <testcontainers.version>1.15.2</testcontainers.version> Review comment: I do not how other bahir projects handle this but is it important that users can overwrite the test container version? I would see tests as internal. ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerClient.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Helpers to interact with the Pinot controller via its public API. + */ +public class PinotControllerClient implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(PinotControllerClient.class); + private final PinotControllerHttpClient httpClient; + + /** + * @param controllerHost Pinot controller's host + * @param controllerPort Pinot controller's port + */ + public PinotControllerClient(String controllerHost, String controllerPort) { + httpClient = new PinotControllerHttpClient(controllerHost, controllerPort); Review comment: In the constructor you are supposed to use `this.` :) ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.http.StatusLine; +import org.apache.http.client.methods.*; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Helpers to interact with the Pinot controller via its public API. + */ +public class PinotControllerHttpClient implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(PinotControllerHttpClient.class); + protected final String controllerHostPort; + protected final CloseableHttpClient httpClient; + + /** + * @param controllerHost Pinot controller's host + * @param controllerPort Pinot controller's port + */ + public PinotControllerHttpClient(String controllerHost, String controllerPort) { + checkNotNull(controllerHost); + checkNotNull(controllerPort); + controllerHostPort = String.format("http://%s:%s", controllerHost, controllerPort); + httpClient = HttpClients.createDefault(); + } + + /** + * Issues a request to the Pinot controller API. + * + * @param request Request to issue + * @return Api response + * @throws IOException + */ + private ApiResponse execute(HttpRequestBase request) throws IOException { + ApiResponse result; + + try (CloseableHttpResponse response = httpClient.execute(request)) { + String body = EntityUtils.toString(response.getEntity()); + result = new ApiResponse(response.getStatusLine(), body); + } + + return result; + } + + /** + * Issues a POST request to the Pinot controller API. + * + * @param path Path to POST to + * @param body Request's body + * @return API response + * @throws IOException + */ + protected ApiResponse post(String path, String body) throws IOException { + HttpPost httppost = new HttpPost(this.controllerHostPort + path); + httppost.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON)); + LOG.debug("Posting string entity {} to {}", body, path); + return this.execute(httppost); + } + + /** + * Issues a GET request to the Pinot controller API. + * + * @param path Path to GET from + * @return API response + * @throws IOException + */ + protected ApiResponse get(String path) throws IOException { Review comment: private ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/LocalFileSystemAdapter.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.filesystem; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The LocalFileSystemAdapter is used when sharing files via the local filesystem. + * Keep in mind that using this FileSystemAdapter requires running the Flink app on a single node. + */ +public class LocalFileSystemAdapter extends FileSystemAdapter { Review comment: I would move this class to the test package and also remove this option from the sink builder because it is a highly unlikely setting. ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemAdapter.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.filesystem; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Defines the interaction with a shared filesystem. The shared filesystem must be accessible from all + * nodes within the cluster than run a partition of the {@link org.apache.flink.streaming.connectors.pinot.PinotSink}. + */ +public abstract class FileSystemAdapter implements Serializable { Review comment: Can this be an interface? ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.serializer; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; + +import java.io.*; + +/** + * Serializer for {@link PinotSinkCommittable} + */ +public class PinotSinkCommittableSerializer implements SimpleVersionedSerializer<PinotSinkCommittable> { + + private static final int CURRENT_VERSION = 1; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PinotSinkCommittable pinotSinkCommittable) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + out.writeLong(pinotSinkCommittable.getMinTimestamp()); + out.writeLong(pinotSinkCommittable.getMaxTimestamp()); + out.writeUTF(pinotSinkCommittable.getDataFilePath()); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public PinotSinkCommittable deserialize(int version, byte[] serialized) throws IOException { + switch (version) { + case 1: + return deserializeV1(serialized); + default: + throw new IOException("Unrecognized version or corrupt state: " + version); Review comment: ```suggestion throw new IllegalStateException("Unrecognized version or corrupt state: " + version); ``` ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.serializer; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +/** + * Serializer for {@link PinotSinkGlobalCommittable} + */ +public class PinotSinkGlobalCommittableSerializer implements SimpleVersionedSerializer<PinotSinkGlobalCommittable> { + + private static final int CURRENT_VERSION = 1; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PinotSinkGlobalCommittable pinotSinkGlobalCommittable) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + out.writeLong(pinotSinkGlobalCommittable.getMinTimestamp()); + out.writeLong(pinotSinkGlobalCommittable.getMaxTimestamp()); + + int size = pinotSinkGlobalCommittable.getDataFilePaths().size(); + out.writeInt(size); + for (String dataFilePath : pinotSinkGlobalCommittable.getDataFilePaths()) { + out.writeUTF(dataFilePath); + } + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public PinotSinkGlobalCommittable deserialize(int version, byte[] serialized) throws IOException { + switch (version) { + case 1: + return deserializeV1(serialized); + default: + throw new IOException("Unrecognized version or corrupt state: " + version); Review comment: ```suggestion throw new IllegalStateException("Unrecognized version or corrupt state: " + version); ``` ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.writer; + +import com.google.common.collect.Iterables; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Accepts incoming elements and creates {@link PinotSinkCommittable}s out of them on request. + * + * @param <IN> Type of incoming elements + */ +public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(PinotSinkWriter.class); + + private final int maxRowsPerSegment; + private final EventTimeExtractor<IN> eventTimeExtractor; + private final JsonSerializer<IN> jsonSerializer; + + private final List<PinotWriterSegment<IN>> activeSegments; + private final FileSystemAdapter fsAdapter; + + private final int subtaskId; + + /** + * @param subtaskId Subtask id provided by Flink + * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment + * @param eventTimeExtractor Defines the way event times are extracted from received objects + * @param jsonSerializer Serializer used to convert elements to JSON + * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes + */ + public PinotSinkWriter(int subtaskId, int maxRowsPerSegment, + EventTimeExtractor<IN> eventTimeExtractor, + JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) { + this.subtaskId = subtaskId; + this.maxRowsPerSegment = maxRowsPerSegment; + this.eventTimeExtractor = checkNotNull(eventTimeExtractor); + this.jsonSerializer = checkNotNull(jsonSerializer); + this.fsAdapter = checkNotNull(fsAdapter); + this.activeSegments = new ArrayList<>(); + } + + /** + * Takes elements from an upstream tasks and writes them into {@link PinotWriterSegment} + * + * @param element Object from upstream task + * @param context SinkWriter context + * @throws IOException + */ + @Override + public void write(IN element, Context context) throws IOException { + final PinotWriterSegment<IN> inProgressSegment = getOrCreateInProgressSegment(); + inProgressSegment.write(element, eventTimeExtractor.getEventTime(element, context)); + } + + /** + * Creates {@link PinotSinkCommittable}s from elements previously received via {@link #write}. + * If flush is set, all {@link PinotWriterSegment}s are transformed into + * {@link PinotSinkCommittable}s. If flush is not set, only currently non-active + * {@link PinotSinkCommittable}s are transformed into {@link PinotSinkCommittable}s. + * To convert a {@link PinotWriterSegment} into a {@link PinotSinkCommittable} the data gets + * written to the shared filesystem. Moreover, minimum and maximum timestamps are identified. + * Finally, all {@link PinotWriterSegment}s transformed into {@link PinotSinkCommittable}s are + * removed from {@link #activeSegments}. + * + * @param flush Flush all currently known elements into the {@link PinotSinkCommittable}s + * @return List of {@link PinotSinkCommittable} to process in {@link org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter} + * @throws IOException + */ + @Override + public List<PinotSinkCommittable> prepareCommit(boolean flush) throws IOException { + // Identify segments to commit. If the flush argument is set all segments shall be committed. + // Otherwise, take only those PinotWriterSegments that do not accept any more elements. + List<PinotWriterSegment<IN>> segmentsToCommit = activeSegments.stream() + .filter(s -> flush || !s.acceptsElements()) + .collect(Collectors.toList()); + LOG.debug("Identified {} segments to commit [subtaskId={}]", segmentsToCommit.size(), subtaskId); + + LOG.debug("Creating committables... [subtaskId={}]", subtaskId); + List<PinotSinkCommittable> committables = new ArrayList<>(); + for (final PinotWriterSegment<IN> segment : segmentsToCommit) { + committables.add(segment.prepareCommit()); + } + LOG.debug("Created {} committables [subtaskId={}]", committables.size(), subtaskId); + + // Remove all PinotWriterSegments that will be emitted within the committables. + activeSegments.removeAll(segmentsToCommit); + return committables; + } + + /** + * Gets the {@link PinotWriterSegment} still accepting elements or creates a new one. + * + * @return {@link PinotWriterSegment} accepting at least one more element + */ + private PinotWriterSegment<IN> getOrCreateInProgressSegment() { + final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null); + if (latestSegment == null || !latestSegment.acceptsElements()) { + final PinotWriterSegment<IN> inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter); + activeSegments.add(inProgressSegment); + return inProgressSegment; + } + return latestSegment; + } + + /** + * As we do not need to save any information in snapshots, + * this method always returns an empty ArrayList. + * + * @return always an empty ArrayList + */ + @Override + public List<Void> snapshotState() { Review comment: I would expect that we have to save the path to the active files on every checkpoint. ########## File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException; +import org.apache.pinot.client.*; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Helper class ot interact with the Pinot controller and broker in the e2e tests + */ +public class PinotTestHelper implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(PinotTestHelper.class); + private final String host; + private final String brokerPort; + private final PinotControllerHttpClient httpClient; + + /** + * @param host Host the Pinot controller and broker are accessible at + * @param controllerPort The Pinot controller's external port at {@code host} + * @param brokerPort A Pinot broker's external port at {@code host} + */ + public PinotTestHelper(String host, String controllerPort, String brokerPort) { + this.host = host; + this.brokerPort = brokerPort; + httpClient = new PinotControllerHttpClient(host, controllerPort); Review comment: ```suggestion this.httpClient = new PinotControllerHttpClient(host, controllerPort); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
