Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/3437
Change subject: [ASTERIXDB-2593][FUN][WIP] TPC-DS support "all" to generate all
tables
......................................................................
[ASTERIXDB-2593][FUN][WIP] TPC-DS support "all" to generate all tables
- user model changes: yes
- passing value of "all" (case insensitive) to first argument
generates the data for all tpc-ds tables
- storage format changes: no
- interface changes: no
Details:
- Add support for "all" argument to generate all tables data for
tpc-ds.
- Generic behavior to generate a single table or all tables by
passing a specific table name or "all" as a first argument value.
Change-Id: Iff199b0c533d22bcae1caf5057788b257ba4e486
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
6 files changed, 69 insertions(+), 31 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/37/3437/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
index cb7e357..75f4159 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.app.function;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.metadata.api.IDatasourceFunction;
import org.apache.asterix.metadata.declared.DataSourceId;
import org.apache.asterix.metadata.declared.FunctionDataSource;
@@ -34,7 +35,7 @@
private final String tableName;
private final double scalingFactor;
- public TPCDSDataGeneratorDatasource(INodeDomain domain, String tableName,
double scalingFactor)
+ TPCDSDataGeneratorDatasource(INodeDomain domain, String tableName, double
scalingFactor)
throws AlgebricksException {
super(createDataSourceId(tableName, scalingFactor), domain);
this.tableName = tableName;
@@ -62,4 +63,9 @@
AlgebricksAbsolutePartitionConstraint locations) {
return new TPCDSDataGeneratorFunction(locations, tableName,
scalingFactor);
}
+
+ @Override
+ protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
+ return new
AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations());
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
index 4c52bca..c328a9b 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
@@ -35,7 +35,7 @@
private final double scalingFactor;
private final int parallelism;
- public TPCDSDataGeneratorFunction(AlgebricksAbsolutePartitionConstraint
locations, String tableName,
+ TPCDSDataGeneratorFunction(AlgebricksAbsolutePartitionConstraint
locations, String tableName,
double scalingFactor) {
super(locations);
this.tableName = tableName;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
index 626dee3..b1f5d4d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.function;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -50,11 +51,15 @@
private final int chunkNumber;
private final String tableName;
private final double scalingFactor;
- private Table selectedTable;
+ private List<Table> selectedTables;
private final StringBuilder builder = new StringBuilder();
- private final Iterator<List<List<String>>> dataGeneratorIterator;
+ private final List<Iterator<List<List<String>>>> tableIterators;
- public TPCDSDataGeneratorReader(String tableName, double scalingFactor,
int parallelism, int partitionNumber)
+ private Table currentTable;
+ private int tableCount = 0;
+ private int currentTableIndex = 0;
+
+ TPCDSDataGeneratorReader(String tableName, double scalingFactor, int
parallelism, int partitionNumber)
throws HyracksDataException {
this.tableName = tableName;
this.scalingFactor = scalingFactor;
@@ -70,18 +75,33 @@
*/
chunkNumber = partitionNumber + 1;
- dataGeneratorIterator = getDataGeneratorIterator();
+ tableIterators = getTableGeneratorIterators();
}
@Override
public boolean hasNext() {
- return dataGeneratorIterator.hasNext();
+
+ // Current table still has more
+ if (tableIterators.get(currentTableIndex).hasNext()) {
+ return true;
+ }
+
+ // We went over all the tables
+ if (currentTableIndex == tableCount - 1) {
+ return false;
+ }
+
+ // Go to the next table
+ currentTableIndex++;
+ currentTable = selectedTables.get(currentTableIndex);
+
+ return tableIterators.get(currentTableIndex).hasNext();
}
@Override
public IRawRecord<char[]> next() throws IOException, InterruptedException {
CharArrayRecord record = new CharArrayRecord();
-
record.append((formatRecord(dataGeneratorIterator.next())).toCharArray());
+
record.append((formatRecord(tableIterators.get(currentTableIndex).next())).toCharArray());
record.endRecord();
return record;
}
@@ -91,16 +111,25 @@
*
* @return A lazy iterator to generate the data based on the specified
properties.
*/
- private Iterator<List<List<String>>> getDataGeneratorIterator() throws
HyracksDataException {
- selectedTable = getTableFromStringTableName(tableName);
+ private List<Iterator<List<List<String>>>> getTableGeneratorIterators()
throws HyracksDataException {
+ selectedTables = getTableFromStringTableName(tableName);
+
+ currentTableIndex = 0;
+ tableCount = selectedTables.size();
+ currentTable = selectedTables.get(currentTableIndex);
// Create the session with the specified properties, the sessions also
specifies the chunk to be generated
- Session session =
Session.getDefaultSession().withTable(selectedTable).withScale(scalingFactor)
- .withParallelism(parallelism).withChunkNumber(chunkNumber);
+ Session session =
Session.getDefaultSession().withScale(scalingFactor).withParallelism(parallelism)
+ .withChunkNumber(chunkNumber);
- // Construct the Results and Results iterator
- Results results = Results.constructResults(selectedTable, session);
- return results.iterator();
+ // Create an iterator for each passed table
+ List<Iterator<List<List<String>>>> tableIterators = new ArrayList<>();
+ for (Table table : selectedTables) {
+ Results result = Results.constructResults(table, session);
+ tableIterators.add(result.iterator());
+ }
+
+ return tableIterators;
}
/**
@@ -109,8 +138,14 @@
* @param tableName String table name to search for.
* @return Table if found, throws an exception otherwise.
*/
- private Table getTableFromStringTableName(String tableName) throws
HyracksDataException {
+ private List<Table> getTableFromStringTableName(String tableName) throws
HyracksDataException {
+ // Use all the tables
+ if
(tableName.equalsIgnoreCase(TPCDSDataGeneratorRewriter.TPCDS_ALL_TABLES)) {
+ return Table.getBaseTables();
+ }
+
+ // Search for the table
List<Table> matchedTables = Table.getBaseTables().stream()
.filter(table ->
tableName.equalsIgnoreCase(table.getName())).collect(Collectors.toList());
@@ -119,7 +154,7 @@
throw new RuntimeDataException(ErrorCode.TPCDS_INVALID_TABLE_NAME,
getIdentifier().getName(), tableName);
}
- return matchedTables.get(0);
+ return matchedTables;
}
/**
@@ -134,25 +169,22 @@
// Clear the builder (This is faster than re-creating the builder each
iteration)
builder.setLength(0);
- int counter;
- builder.append("{");
+ builder.append("{\"partition\":\"");
+ builder.append(chunkNumber - 1);
+ builder.append("\"");
// We loop only to the item before the last, then add the last item
manually to avoid appending the ","
// at the end, this way we avoid constantly checking if we're at the
last item or substring the whole record
- for (counter = 0; counter < values.get(0).size() - 1; counter++) {
- builder.append("\"");
- builder.append(selectedTable.getColumns()[counter].getName());
+ for (int counter = 0; counter < values.get(0).size(); counter++) {
+ builder.append(",\"");
+ builder.append(currentTable.getColumns()[counter].getName());
builder.append("\":\"");
builder.append(values.get(0).get(counter));
- builder.append("\",");
+ builder.append("\"");
}
// This is the last item to be appended, don't append the "," after
appending the field
- builder.append("\"");
- builder.append(selectedTable.getColumns()[counter].getName());
- builder.append("\":\"");
- builder.append(values.get(0).get(counter));
- builder.append("\"}");
+ builder.append("}");
return builder.toString();
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
index fb871b9..7bcc6c2 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
@@ -49,6 +49,7 @@
*/
public class TPCDSDataGeneratorRewriter extends FunctionRewriter {
+ public static final String TPCDS_ALL_TABLES = "ALL";
public static final FunctionIdentifier TPCDS_DATA_GENERATOR =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"tpcds-datagen", 2);
public static final TPCDSDataGeneratorRewriter INSTANCE = new
TPCDSDataGeneratorRewriter(TPCDS_DATA_GENERATOR);
@@ -61,7 +62,6 @@
protected FunctionDataSource toDatasource(IOptimizationContext context,
AbstractFunctionCallExpression function)
throws AlgebricksException {
- // TODO(Hussain) check if it is safe to assume that we're receiving
constant expressions only as arguments.
UnnestingFunctionCallExpression functionCall =
(UnnestingFunctionCallExpression) function;
ConstantExpression tableNameArgument = (ConstantExpression)
functionCall.getArguments().get(0).getValue();
ConstantExpression scalingFactorArgument = (ConstantExpression)
functionCall.getArguments().get(1).getValue();
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 6eac23b..4f5adec 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -75,7 +75,7 @@
public static final String DEFAULT_CONF_FILE = joinPath(RESOURCES_PATH,
"cc.conf");
private static final String DEFAULT_STORAGE_PATH = joinPath("target",
"io", "dir");
private static String storagePath = DEFAULT_STORAGE_PATH;
- private static final long RESULT_TTL = TimeUnit.MINUTES.toMillis(5);
+ private static final long RESULT_TTL = TimeUnit.MINUTES.toMillis(30); //
TODO(Hussain): Revert back to 5 mins
static {
System.setProperty("java.util.logging.manager",
org.apache.logging.log4j.jul.LogManager.class.getName());
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 3b5cf2e..1635507 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -91,7 +91,7 @@
protected abstract IDatasourceFunction createFunction(MetadataProvider
metadataProvider,
AlgebricksAbsolutePartitionConstraint locations);
- protected static AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
+ protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
String[] allPartitions = csm.getClusterLocations().getLocations();
Set<String> ncs = new HashSet<>(Arrays.asList(allPartitions));
return new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new
String[ncs.size()]));
--
To view, visit https://asterix-gerrit.ics.uci.edu/3437
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iff199b0c533d22bcae1caf5057788b257ba4e486
Gerrit-Change-Number: 3437
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>