[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


JingsongLi commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134777880


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -= 

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


JingsongLi commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134777430


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -= 

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


JingsongLi commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134775901


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -=