[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
JingsongLi commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134777880 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
JingsongLi commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134777430 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
JingsongLi commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134775901 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=