>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19293 )
Change subject: [ASTERIXDB-3547][EXT]: Use IDataPartitioningProvider for Location Constraints Configuration in Delta ...................................................................... [ASTERIXDB-3547][EXT]: Use IDataPartitioningProvider for Location Constraints Configuration in Delta - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-64800 Change-Id: I9ba2ce7afc54ddd08f5e522627c56e44f51cdbbc Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19293 Integration-Tests: Jenkins <[email protected]> Tested-by: Ali Alsuliman <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm A asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp 11 files changed, 642 insertions(+), 22 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved; Verified Anon. E. Moose #1000171: Jenkins: Verified diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java index c50b391..7e7b6f5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java @@ -440,6 +440,11 @@ loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table/_delta_log", JSON_FILTER, "delta-data/"); loadDeltaDirectory(generatedDataBasePath, "/delta_all_type/_delta_log", JSON_FILTER, "delta-data/"); loadDeltaDirectory(generatedDataBasePath, "/delta_all_type", PARQUET_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine/_delta_log", JSON_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine", PARQUET_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one/_delta_log", JSON_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/"); + } private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter, diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java index 1236636..67d460c 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java @@ -55,6 +55,10 @@ "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "modified_delta_table"; public static final String DELTA_MULTI_FILE_TABLE = "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "multiple_file_delta_table"; + public static final String DELTA_FILE_SIZE_ONE = + "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_one"; + public static final String DELTA_FILE_SIZE_NINE = + "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine"; public static void prepareDeltaTableContainer(Configuration conf) { File basePath = new File("."); @@ -62,6 +66,8 @@ prepareMultipleFilesTable(conf); prepareModifiedTable(conf); prepareEmptyTable(conf); + prepareFileSizeOne(conf); + prepareFileSizeNine(conf); } public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) { @@ -221,4 +227,129 @@ throw new RuntimeException(e); } } + + public static void prepareFileSizeOne(Configuration conf) { + Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name").endRecord(); + try { + Path path = new Path(DELTA_FILE_SIZE_ONE, "firstFile.parquet"); + ParquetWriter<GenericData.Record> writer = + AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build(); + + List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema)); + + fileFirstSnapshotRecords.get(0).put("id", 0); + fileFirstSnapshotRecords.get(0).put("name", "Cooper"); + + for (GenericData.Record record : fileFirstSnapshotRecords) { + writer.write(record); + } + + long size = writer.getDataSize(); + writer.close(); + + List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size, + System.currentTimeMillis(), true, null, null)); + DeltaLog log = DeltaLog.forTable(conf, DELTA_FILE_SIZE_ONE); + OptimisticTransaction txn = log.startTransaction(); + Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>()) + .schema(new StructType().add(new StructField("id", new IntegerType(), true)) + .add(new StructField("name", new StringType(), true))) + .build(); + txn.updateMetadata(metaData); + txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create"); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void prepareFileSizeNine(Configuration conf) { + Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name").endRecord(); + try { + Path path = new Path(DELTA_FILE_SIZE_NINE, "firstFile.parquet"); + ParquetWriter<GenericData.Record> writer = + AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build(); + + List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema)); + List<GenericData.Record> fileSecondSnapshotRecords = List.of(new GenericData.Record(schema)); + List<GenericData.Record> fileThirdSnapshotRecords = List.of(new GenericData.Record(schema)); + List<GenericData.Record> fileFourthSnapshotRecords = List.of(new GenericData.Record(schema)); + List<GenericData.Record> fileFifthSnapshotRecords = List.of(new GenericData.Record(schema)); + List<GenericData.Record> fileSixthSnapshotRecords = List.of(new GenericData.Record(schema)); + List<GenericData.Record> fileSeventhSnapshotRecords = List.of(new GenericData.Record(schema)); + List<GenericData.Record> fileEightSnapshotRecords = List.of(new GenericData.Record(schema)); + List<GenericData.Record> fileNineSnapshotRecords = List.of(new GenericData.Record(schema)); + + List<List<GenericData.Record>> allSnapshotRecords = + List.of(fileFirstSnapshotRecords, fileSecondSnapshotRecords, fileThirdSnapshotRecords, + fileFourthSnapshotRecords, fileFifthSnapshotRecords, fileSixthSnapshotRecords, + fileSeventhSnapshotRecords, fileEightSnapshotRecords, fileNineSnapshotRecords); + + fileFirstSnapshotRecords.get(0).put("id", 0); + fileFirstSnapshotRecords.get(0).put("name", "Cooper"); + + fileSecondSnapshotRecords.get(0).put("id", 1); + fileSecondSnapshotRecords.get(0).put("name", "Adam"); + + fileThirdSnapshotRecords.get(0).put("id", 2); + fileThirdSnapshotRecords.get(0).put("name", "Third"); + + fileFourthSnapshotRecords.get(0).put("id", 3); + fileFourthSnapshotRecords.get(0).put("name", "Fourth"); + + fileFifthSnapshotRecords.get(0).put("id", 4); + fileFifthSnapshotRecords.get(0).put("name", "Five"); + + fileSixthSnapshotRecords.get(0).put("id", 5); + fileSixthSnapshotRecords.get(0).put("name", "Six"); + + fileSeventhSnapshotRecords.get(0).put("id", 6); + fileSeventhSnapshotRecords.get(0).put("name", "Seven"); + + fileEightSnapshotRecords.get(0).put("id", 7); + fileEightSnapshotRecords.get(0).put("name", "Eight"); + + fileNineSnapshotRecords.get(0).put("id", 8); + fileNineSnapshotRecords.get(0).put("name", "Nine"); + + for (GenericData.Record record : fileFirstSnapshotRecords) { + writer.write(record); + } + + long size = writer.getDataSize(); + writer.close(); + + List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size, + System.currentTimeMillis(), true, null, null)); + DeltaLog log = DeltaLog.forTable(conf, DELTA_FILE_SIZE_NINE); + OptimisticTransaction txn = log.startTransaction(); + Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>()) + .schema(new StructType().add(new StructField("id", new IntegerType(), true)) + .add(new StructField("name", new StringType(), true))) + .build(); + txn.updateMetadata(metaData); + txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create"); + + for (int i = 2; i <= 9; i++) { + Path path2 = new Path(DELTA_FILE_SIZE_NINE, "File" + i + ".parquet"); + ParquetWriter<GenericData.Record> writer2 = + AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build(); + + for (GenericData.Record record : allSnapshotRecords.get(i - 1)) { + writer2.write(record); + } + + long size2 = writer2.getDataSize(); + writer2.close(); + + List<Action> actions2 = List.of(new AddFile("File" + i + ".parquet", new HashMap<>(), size2, + System.currentTimeMillis(), true, null, null)); + + OptimisticTransaction txn2 = log.startTransaction(); + txn2.commit(actions2, new Operation(Operation.Name.WRITE), "deltalake-table-create"); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp new file mode 100644 index 0000000..c1a74c5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/delta_file_size_nine"), + ("table-format" = "delta") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp new file mode 100644 index 0000000..db2abf5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds order by id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp new file mode 100644 index 0000000..1284e93 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/delta_file_size_one"), + ("table-format" = "delta") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp new file mode 100644 index 0000000..84e7914 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm new file mode 100644 index 0000000..500f6a9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm @@ -0,0 +1,9 @@ +{ "id": 0, "name": "Cooper" } +{ "id": 1, "name": "Adam" } +{ "id": 2, "name": "Third" } +{ "id": 3, "name": "Fourth" } +{ "id": 4, "name": "Five" } +{ "id": 5, "name": "Six" } +{ "id": 6, "name": "Seven" } +{ "id": 7, "name": "Eight" } +{ "id": 8, "name": "Nine" } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm new file mode 100644 index 0000000..006681c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm @@ -0,0 +1 @@ +{ "id": 0, "name": "Cooper" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index ff1b325..c31cb5e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -556,6 +556,18 @@ </compilation-unit> </test-case> <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-file-one"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/deltalake-file-one</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-file-nine"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/deltalake-file-nine</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> <compilation-unit name="common/avro/avro-types/avro-map"> <placeholder name="adapter" value="S3" /> <placeholder name="path_prefix" value="" /> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index dc4c310..790db8c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -30,7 +30,6 @@ import java.util.PriorityQueue; import java.util.Set; -import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; @@ -44,7 +43,6 @@ import org.apache.asterix.external.util.HDFSUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.runtime.projection.FunctionCallInformation; -import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -77,6 +75,10 @@ protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); protected ConfFactory confFactory; + public List<PartitionWorkLoadBasedOnSize> getPartitionWorkLoadsBasedOnSize() { + return partitionWorkLoadsBasedOnSize; + } + @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { return locationConstraints; @@ -134,9 +136,9 @@ scanFiles.add(row); } } - locationConstraints = configureLocationConstraints(appCtx, scanFiles); + locationConstraints = getPartitions(appCtx); configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); - distributeFiles(scanFiles); + distributeFiles(scanFiles, getPartitionConstraint().getLocations().length); issueWarnings(warnings, warningCollector); } @@ -151,26 +153,11 @@ warnings.clear(); } - private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx, - List<Row> scanFiles) { - IClusterStateManager csm = appCtx.getClusterStateManager(); - - String[] locations = csm.getClusterLocations().getLocations(); - if (scanFiles.size() == 0) { - return AlgebricksAbsolutePartitionConstraint.randomLocation(locations); - } else if (locations.length > scanFiles.size()) { - LOGGER.debug( - "configured partitions ({}) exceeds total partition count ({}); limiting configured partitions to total partition count", - locations.length, scanFiles.size()); - final String[] locationCopy = locations.clone(); - ArrayUtils.shuffle(locationCopy); - locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size()); - } - return new AlgebricksAbsolutePartitionConstraint(locations); + public AlgebricksAbsolutePartitionConstraint getPartitions(ICcApplicationContext appCtx) { + return appCtx.getDataPartitioningProvider().getClusterLocations(); } - private void distributeFiles(List<Row> scanFiles) { - final int partitionsCount = getPartitionConstraint().getLocations().length; + public void distributeFiles(List<Row> scanFiles, int partitionsCount) { PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java new file mode 100644 index 0000000..1bc8eb8 --- /dev/null +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.awss3; + +import static io.delta.kernel.internal.InternalScanFileUtils.ADD_FILE_ORDINAL; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.junit.Assert; +import org.junit.Test; + +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.data.Row; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +public class DeltaTopicPartitionDistributionTest { + + @Test + public void distributeFilesMoreFilesThanPartitions() { + int rowCount = 25; + int numberOfPartition = 13; + List<Row> scanFiles = createMockRows(rowCount); + DeltaReaderFactory d = new DeltaReaderFactory() { + @Override + protected void configureJobConf(JobConf conf, Map<String, String> configuration) + throws AlgebricksException { + + } + + @Override + protected String getTablePath(Map<String, String> configuration) throws AlgebricksException { + return null; + } + + @Override + public List<String> getRecordReaderNames() { + return null; + } + }; + d.distributeFiles(scanFiles, numberOfPartition); + Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size()); + verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize()); + } + + @Test + public void distributeFilesLessFilesThanPartitions() { + int rowCount = 15; + int numberOfPartition = 23; + List<Row> scanFiles = createMockRows(rowCount); + DeltaReaderFactory d = new DeltaReaderFactory() { + @Override + protected void configureJobConf(JobConf conf, Map<String, String> configuration) + throws AlgebricksException { + + } + + @Override + protected String getTablePath(Map<String, String> configuration) throws AlgebricksException { + return null; + } + + @Override + public List<String> getRecordReaderNames() { + return null; + } + }; + d.distributeFiles(scanFiles, numberOfPartition); + Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size()); + verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize()); + } + + @Test + public void distributeFilesEqualFilesAndPartitions() { + int rowCount = 9; + int numberOfPartition = 9; + List<Row> scanFiles = createMockRows(rowCount); + DeltaReaderFactory d = new DeltaReaderFactory() { + @Override + protected void configureJobConf(JobConf conf, Map<String, String> configuration) + throws AlgebricksException { + + } + + @Override + protected String getTablePath(Map<String, String> configuration) throws AlgebricksException { + return null; + } + + @Override + public List<String> getRecordReaderNames() { + return null; + } + }; + d.distributeFiles(scanFiles, numberOfPartition); + Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size()); + verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize()); + } + + private void verifyFileDistribution(int numberOfFiles, + List<DeltaReaderFactory.PartitionWorkLoadBasedOnSize> workloads) { + int totalDistributedFiles = 0; + + for (DeltaReaderFactory.PartitionWorkLoadBasedOnSize workload : workloads) { + totalDistributedFiles += workload.getScanFiles().size(); + Assert.assertTrue(workload.getTotalSize() >= 0); + } + Assert.assertEquals(numberOfFiles, totalDistributedFiles); + } + + private List<Row> createMockRows(int count) { + List<Row> rows = new ArrayList<>(); + StructType sch = createMockSchema(); + + for (int i = 1; i <= count; i++) { + int finalI = i; + Row row = new Row() { + + @Override + public StructType getSchema() { + return sch; + } + + @Override + public boolean isNullAt(int i) { + return false; + } + + @Override + public boolean getBoolean(int i) { + return false; + } + + @Override + public byte getByte(int i) { + return 0; + } + + @Override + public short getShort(int i) { + return 0; + } + + @Override + public int getInt(int i) { + if (i == 1) { + return finalI; + } else if (i == 2) { + return finalI * 10; + } + return 0; + } + + @Override + public long getLong(int i) { + return 0; + } + + @Override + public float getFloat(int i) { + return 0; + } + + @Override + public double getDouble(int i) { + return 0; + } + + @Override + public String getString(int i) { + if (i == 0) { + return "tableRoot_" + finalI; + } else if (i == 1) { + return "addFilePath_" + finalI; + } + return null; + } + + @Override + public BigDecimal getDecimal(int i) { + return null; + } + + @Override + public byte[] getBinary(int i) { + return new byte[0]; + } + + @Override + public ArrayValue getArray(int i) { + return null; + } + + @Override + public MapValue getMap(int i) { + return null; + } + + @Override + public Row getStruct(int index) { + if (index == ADD_FILE_ORDINAL) { + return createAddFileEntry(finalI); + } + return null; + } + }; + + rows.add(row); + } + + return rows; + } + + private StructType createMockSchema() { + List<StructField> fields = new ArrayList<>(); + + fields.add(new StructField("field1", StringType.STRING, true)); + fields.add(new StructField("field2", IntegerType.INTEGER, true)); + fields.add(new StructField("field3", IntegerType.INTEGER, true)); + + return new StructType(fields); + } + + private Row createAddFileEntry(int i) { + List<StructField> addFileFields = new ArrayList<>(); + + addFileFields.add(new StructField("addFilePath", StringType.STRING, true)); + addFileFields.add(new StructField("size", IntegerType.INTEGER, true)); + + StructType addFileSchema = new StructType(addFileFields); + + Row addFileRow = new Row() { + @Override + public StructType getSchema() { + return addFileSchema; + } + + @Override + public boolean isNullAt(int index) { + return false; + } + + @Override + public boolean getBoolean(int i) { + return false; + } + + @Override + public byte getByte(int i) { + return 0; + } + + @Override + public short getShort(int i) { + return 0; + } + + @Override + public int getInt(int index) { + if (index == 1) { + return i * 100; + } + return 0; + } + + @Override + public long getLong(int i) { + return 0; + } + + @Override + public float getFloat(int i) { + return 0; + } + + @Override + public double getDouble(int i) { + return 0; + } + + @Override + public BigDecimal getDecimal(int i) { + return null; + } + + @Override + public byte[] getBinary(int i) { + return new byte[0]; + } + + @Override + public Row getStruct(int index) { + return null; + } + + @Override + public ArrayValue getArray(int index) { + return null; + } + + @Override + public MapValue getMap(int index) { + return null; + } + + @Override + public String getString(int index) { + if (index == 0) { + return "addFilePath_" + i; + } + return null; + } + }; + + return addFileRow; + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19293 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I9ba2ce7afc54ddd08f5e522627c56e44f51cdbbc Gerrit-Change-Number: 19293 Gerrit-PatchSet: 9 Gerrit-Owner: Ayush Tripathi <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ayush Tripathi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Ritik Raj <[email protected]> Gerrit-MessageType: merged
