Hussain Towaileb has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3437
Change subject: [ASTERIXDB-2593][FUN][WIP] TPC-DS support "all" to generate all tables ...................................................................... [ASTERIXDB-2593][FUN][WIP] TPC-DS support "all" to generate all tables - user model changes: yes - passing value of "all" (case insensitive) to first argument generates the data for all tpc-ds tables - storage format changes: no - interface changes: no Details: - Add support for "all" argument to generate all tables data for tpc-ds. - Generic behavior to generate a single table or all tables by passing a specific table name or "all" as a first argument value. Change-Id: Iff199b0c533d22bcae1caf5057788b257ba4e486 --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java 6 files changed, 69 insertions(+), 31 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/37/3437/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java index cb7e357..75f4159 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.app.function; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.metadata.api.IDatasourceFunction; import org.apache.asterix.metadata.declared.DataSourceId; import org.apache.asterix.metadata.declared.FunctionDataSource; @@ -34,7 +35,7 @@ private final String tableName; private final double scalingFactor; - public TPCDSDataGeneratorDatasource(INodeDomain domain, String tableName, double scalingFactor) + TPCDSDataGeneratorDatasource(INodeDomain domain, String tableName, double scalingFactor) throws AlgebricksException { super(createDataSourceId(tableName, scalingFactor), domain); this.tableName = tableName; @@ -62,4 +63,9 @@ AlgebricksAbsolutePartitionConstraint locations) { return new TPCDSDataGeneratorFunction(locations, tableName, scalingFactor); } + + @Override + protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { + return new AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations()); + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java index 4c52bca..c328a9b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java @@ -35,7 +35,7 @@ private final double scalingFactor; private final int parallelism; - public TPCDSDataGeneratorFunction(AlgebricksAbsolutePartitionConstraint locations, String tableName, + TPCDSDataGeneratorFunction(AlgebricksAbsolutePartitionConstraint locations, String tableName, double scalingFactor) { super(locations); this.tableName = tableName; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java index 626dee3..b1f5d4d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java @@ -19,6 +19,7 @@ package org.apache.asterix.app.function; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -50,11 +51,15 @@ private final int chunkNumber; private final String tableName; private final double scalingFactor; - private Table selectedTable; + private List<Table> selectedTables; private final StringBuilder builder = new StringBuilder(); - private final Iterator<List<List<String>>> dataGeneratorIterator; + private final List<Iterator<List<List<String>>>> tableIterators; - public TPCDSDataGeneratorReader(String tableName, double scalingFactor, int parallelism, int partitionNumber) + private Table currentTable; + private int tableCount = 0; + private int currentTableIndex = 0; + + TPCDSDataGeneratorReader(String tableName, double scalingFactor, int parallelism, int partitionNumber) throws HyracksDataException { this.tableName = tableName; this.scalingFactor = scalingFactor; @@ -70,18 +75,33 @@ */ chunkNumber = partitionNumber + 1; - dataGeneratorIterator = getDataGeneratorIterator(); + tableIterators = getTableGeneratorIterators(); } @Override public boolean hasNext() { - return dataGeneratorIterator.hasNext(); + + // Current table still has more + if (tableIterators.get(currentTableIndex).hasNext()) { + return true; + } + + // We went over all the tables + if (currentTableIndex == tableCount - 1) { + return false; + } + + // Go to the next table + currentTableIndex++; + currentTable = selectedTables.get(currentTableIndex); + + return tableIterators.get(currentTableIndex).hasNext(); } @Override public IRawRecord<char[]> next() throws IOException, InterruptedException { CharArrayRecord record = new CharArrayRecord(); - record.append((formatRecord(dataGeneratorIterator.next())).toCharArray()); + record.append((formatRecord(tableIterators.get(currentTableIndex).next())).toCharArray()); record.endRecord(); return record; } @@ -91,16 +111,25 @@ * * @return A lazy iterator to generate the data based on the specified properties. */ - private Iterator<List<List<String>>> getDataGeneratorIterator() throws HyracksDataException { - selectedTable = getTableFromStringTableName(tableName); + private List<Iterator<List<List<String>>>> getTableGeneratorIterators() throws HyracksDataException { + selectedTables = getTableFromStringTableName(tableName); + + currentTableIndex = 0; + tableCount = selectedTables.size(); + currentTable = selectedTables.get(currentTableIndex); // Create the session with the specified properties, the sessions also specifies the chunk to be generated - Session session = Session.getDefaultSession().withTable(selectedTable).withScale(scalingFactor) - .withParallelism(parallelism).withChunkNumber(chunkNumber); + Session session = Session.getDefaultSession().withScale(scalingFactor).withParallelism(parallelism) + .withChunkNumber(chunkNumber); - // Construct the Results and Results iterator - Results results = Results.constructResults(selectedTable, session); - return results.iterator(); + // Create an iterator for each passed table + List<Iterator<List<List<String>>>> tableIterators = new ArrayList<>(); + for (Table table : selectedTables) { + Results result = Results.constructResults(table, session); + tableIterators.add(result.iterator()); + } + + return tableIterators; } /** @@ -109,8 +138,14 @@ * @param tableName String table name to search for. * @return Table if found, throws an exception otherwise. */ - private Table getTableFromStringTableName(String tableName) throws HyracksDataException { + private List<Table> getTableFromStringTableName(String tableName) throws HyracksDataException { + // Use all the tables + if (tableName.equalsIgnoreCase(TPCDSDataGeneratorRewriter.TPCDS_ALL_TABLES)) { + return Table.getBaseTables(); + } + + // Search for the table List<Table> matchedTables = Table.getBaseTables().stream() .filter(table -> tableName.equalsIgnoreCase(table.getName())).collect(Collectors.toList()); @@ -119,7 +154,7 @@ throw new RuntimeDataException(ErrorCode.TPCDS_INVALID_TABLE_NAME, getIdentifier().getName(), tableName); } - return matchedTables.get(0); + return matchedTables; } /** @@ -134,25 +169,22 @@ // Clear the builder (This is faster than re-creating the builder each iteration) builder.setLength(0); - int counter; - builder.append("{"); + builder.append("{\"partition\":\""); + builder.append(chunkNumber - 1); + builder.append("\""); // We loop only to the item before the last, then add the last item manually to avoid appending the "," // at the end, this way we avoid constantly checking if we're at the last item or substring the whole record - for (counter = 0; counter < values.get(0).size() - 1; counter++) { - builder.append("\""); - builder.append(selectedTable.getColumns()[counter].getName()); + for (int counter = 0; counter < values.get(0).size(); counter++) { + builder.append(",\""); + builder.append(currentTable.getColumns()[counter].getName()); builder.append("\":\""); builder.append(values.get(0).get(counter)); - builder.append("\","); + builder.append("\""); } // This is the last item to be appended, don't append the "," after appending the field - builder.append("\""); - builder.append(selectedTable.getColumns()[counter].getName()); - builder.append("\":\""); - builder.append(values.get(0).get(counter)); - builder.append("\"}"); + builder.append("}"); return builder.toString(); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java index fb871b9..7bcc6c2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java @@ -49,6 +49,7 @@ */ public class TPCDSDataGeneratorRewriter extends FunctionRewriter { + public static final String TPCDS_ALL_TABLES = "ALL"; public static final FunctionIdentifier TPCDS_DATA_GENERATOR = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "tpcds-datagen", 2); public static final TPCDSDataGeneratorRewriter INSTANCE = new TPCDSDataGeneratorRewriter(TPCDS_DATA_GENERATOR); @@ -61,7 +62,6 @@ protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression function) throws AlgebricksException { - // TODO(Hussain) check if it is safe to assume that we're receiving constant expressions only as arguments. UnnestingFunctionCallExpression functionCall = (UnnestingFunctionCallExpression) function; ConstantExpression tableNameArgument = (ConstantExpression) functionCall.getArguments().get(0).getValue(); ConstantExpression scalingFactorArgument = (ConstantExpression) functionCall.getArguments().get(1).getValue(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 6eac23b..4f5adec 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -75,7 +75,7 @@ public static final String DEFAULT_CONF_FILE = joinPath(RESOURCES_PATH, "cc.conf"); private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir"); private static String storagePath = DEFAULT_STORAGE_PATH; - private static final long RESULT_TTL = TimeUnit.MINUTES.toMillis(5); + private static final long RESULT_TTL = TimeUnit.MINUTES.toMillis(30); // TODO(Hussain): Revert back to 5 mins static { System.setProperty("java.util.logging.manager", org.apache.logging.log4j.jul.LogManager.class.getName()); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java index 3b5cf2e..1635507 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java @@ -91,7 +91,7 @@ protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider, AlgebricksAbsolutePartitionConstraint locations); - protected static AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { + protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { String[] allPartitions = csm.getClusterLocations().getLocations(); Set<String> ncs = new HashSet<>(Arrays.asList(allPartitions)); return new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()])); -- To view, visit https://asterix-gerrit.ics.uci.edu/3437 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: Iff199b0c533d22bcae1caf5057788b257ba4e486 Gerrit-Change-Number: 3437 Gerrit-PatchSet: 1 Gerrit-Owner: Hussain Towaileb <hussai...@gmail.com>