mschroederi commented on a change in pull request #113: URL: https://github.com/apache/bahir-flink/pull/113#discussion_r594695051
########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.writer; + +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link PinotWriterSegment} represents exactly one segment that can be found in the Pinot + * cluster once the commit has been completed. + * + * @param <IN> Type of incoming elements + */ +public class PinotWriterSegment<IN> implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger("PinotWriterSegment"); + + private final int maxRowsPerSegment; + private final String tempDirPrefix; + private final JsonSerializer<IN> jsonSerializer; + private final FileSystemAdapter fsAdapter; + + private boolean acceptsElements = true; + + private final List<IN> elements; + private File dataFile; + private long minTimestamp = Long.MAX_VALUE; + private long maxTimestamp = Long.MIN_VALUE; + + /** + * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment + * @param tempDirPrefix Prefix for temp directories used + * @param jsonSerializer Serializer used to convert elements to JSON + * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes + */ + protected PinotWriterSegment(int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) { + checkArgument(maxRowsPerSegment > 0L); + this.maxRowsPerSegment = maxRowsPerSegment; + this.tempDirPrefix = checkNotNull(tempDirPrefix); + this.jsonSerializer = checkNotNull(jsonSerializer); + this.fsAdapter = checkNotNull(fsAdapter); + this.elements = new ArrayList<>(); + } + + /** + * Takes elements and stores them in memory until either {@link #maxRowsPerSegment} is reached + * or {@link #prepareCommit} is called. + * + * @param element Object from upstream task + * @param timestamp Timestamp assigned to element + * @throws IOException + */ + public void write(IN element, long timestamp) throws IOException { + if (!this.acceptsElements()) { + throw new IllegalStateException("This PinotSegmentWriter does not accept any elements anymore."); + } + this.elements.add(element); + this.minTimestamp = Long.min(this.minTimestamp, timestamp); + this.maxTimestamp = Long.max(this.maxTimestamp, timestamp); + + // Writes elements to local filesystem once the maximum number of items is reached + if (this.elements.size() == this.maxRowsPerSegment) { Review comment: This is now implemented in the current version of this PR. The `FileSystemAdapter` is now capable of writing and reading directly from the shared filesystem. Moreover, it supports deleting files once the committables were successfully committed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
