Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3437


Change subject: [ASTERIXDB-2593][FUN][WIP] TPC-DS support "all" to generate all 
tables
......................................................................

[ASTERIXDB-2593][FUN][WIP] TPC-DS support "all" to generate all tables

- user model changes: yes
  - passing value of "all" (case insensitive) to first argument
    generates the data for all tpc-ds tables
- storage format changes: no
- interface changes: no

Details:
- Add support for "all" argument to generate all tables data for
tpc-ds.
- Generic behavior to generate a single table or all tables by
passing a specific table name or "all" as a first argument value.

Change-Id: Iff199b0c533d22bcae1caf5057788b257ba4e486
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
6 files changed, 69 insertions(+), 31 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/37/3437/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
index cb7e357..75f4159 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.app.function;

+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.metadata.api.IDatasourceFunction;
 import org.apache.asterix.metadata.declared.DataSourceId;
 import org.apache.asterix.metadata.declared.FunctionDataSource;
@@ -34,7 +35,7 @@
     private final String tableName;
     private final double scalingFactor;

-    public TPCDSDataGeneratorDatasource(INodeDomain domain, String tableName, 
double scalingFactor)
+    TPCDSDataGeneratorDatasource(INodeDomain domain, String tableName, double 
scalingFactor)
             throws AlgebricksException {
         super(createDataSourceId(tableName, scalingFactor), domain);
         this.tableName = tableName;
@@ -62,4 +63,9 @@
             AlgebricksAbsolutePartitionConstraint locations) {
         return new TPCDSDataGeneratorFunction(locations, tableName, 
scalingFactor);
     }
+
+    @Override
+    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
+        return new 
AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations());
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
index 4c52bca..c328a9b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
@@ -35,7 +35,7 @@
     private final double scalingFactor;
     private final int parallelism;

-    public TPCDSDataGeneratorFunction(AlgebricksAbsolutePartitionConstraint 
locations, String tableName,
+    TPCDSDataGeneratorFunction(AlgebricksAbsolutePartitionConstraint 
locations, String tableName,
             double scalingFactor) {
         super(locations);
         this.tableName = tableName;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
index 626dee3..b1f5d4d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.function;

 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -50,11 +51,15 @@
     private final int chunkNumber;
     private final String tableName;
     private final double scalingFactor;
-    private Table selectedTable;
+    private List<Table> selectedTables;
     private final StringBuilder builder = new StringBuilder();
-    private final Iterator<List<List<String>>> dataGeneratorIterator;
+    private final List<Iterator<List<List<String>>>> tableIterators;

-    public TPCDSDataGeneratorReader(String tableName, double scalingFactor, 
int parallelism, int partitionNumber)
+    private Table currentTable;
+    private int tableCount = 0;
+    private int currentTableIndex = 0;
+
+    TPCDSDataGeneratorReader(String tableName, double scalingFactor, int 
parallelism, int partitionNumber)
             throws HyracksDataException {
         this.tableName = tableName;
         this.scalingFactor = scalingFactor;
@@ -70,18 +75,33 @@
          */
         chunkNumber = partitionNumber + 1;

-        dataGeneratorIterator = getDataGeneratorIterator();
+        tableIterators = getTableGeneratorIterators();
     }

     @Override
     public boolean hasNext() {
-        return dataGeneratorIterator.hasNext();
+
+        // Current table still has more
+        if (tableIterators.get(currentTableIndex).hasNext()) {
+            return true;
+        }
+
+        // We went over all the tables
+        if (currentTableIndex == tableCount - 1) {
+            return false;
+        }
+
+        // Go to the next table
+        currentTableIndex++;
+        currentTable = selectedTables.get(currentTableIndex);
+
+        return tableIterators.get(currentTableIndex).hasNext();
     }

     @Override
     public IRawRecord<char[]> next() throws IOException, InterruptedException {
         CharArrayRecord record = new CharArrayRecord();
-        
record.append((formatRecord(dataGeneratorIterator.next())).toCharArray());
+        
record.append((formatRecord(tableIterators.get(currentTableIndex).next())).toCharArray());
         record.endRecord();
         return record;
     }
@@ -91,16 +111,25 @@
      *
      * @return A lazy iterator to generate the data based on the specified 
properties.
      */
-    private Iterator<List<List<String>>> getDataGeneratorIterator() throws 
HyracksDataException {
-        selectedTable = getTableFromStringTableName(tableName);
+    private List<Iterator<List<List<String>>>> getTableGeneratorIterators() 
throws HyracksDataException {
+        selectedTables = getTableFromStringTableName(tableName);
+
+        currentTableIndex = 0;
+        tableCount = selectedTables.size();
+        currentTable = selectedTables.get(currentTableIndex);

         // Create the session with the specified properties, the sessions also 
specifies the chunk to be generated
-        Session session = 
Session.getDefaultSession().withTable(selectedTable).withScale(scalingFactor)
-                .withParallelism(parallelism).withChunkNumber(chunkNumber);
+        Session session = 
Session.getDefaultSession().withScale(scalingFactor).withParallelism(parallelism)
+                .withChunkNumber(chunkNumber);

-        // Construct the Results and Results iterator
-        Results results = Results.constructResults(selectedTable, session);
-        return results.iterator();
+        // Create an iterator for each passed table
+        List<Iterator<List<List<String>>>> tableIterators = new ArrayList<>();
+        for (Table table : selectedTables) {
+            Results result = Results.constructResults(table, session);
+            tableIterators.add(result.iterator());
+        }
+
+        return tableIterators;
     }

     /**
@@ -109,8 +138,14 @@
      * @param tableName String table name to search for.
      * @return Table if found, throws an exception otherwise.
      */
-    private Table getTableFromStringTableName(String tableName) throws 
HyracksDataException {
+    private List<Table> getTableFromStringTableName(String tableName) throws 
HyracksDataException {

+        // Use all the tables
+        if 
(tableName.equalsIgnoreCase(TPCDSDataGeneratorRewriter.TPCDS_ALL_TABLES)) {
+            return Table.getBaseTables();
+        }
+
+        // Search for the table
         List<Table> matchedTables = Table.getBaseTables().stream()
                 .filter(table -> 
tableName.equalsIgnoreCase(table.getName())).collect(Collectors.toList());

@@ -119,7 +154,7 @@
             throw new RuntimeDataException(ErrorCode.TPCDS_INVALID_TABLE_NAME, 
getIdentifier().getName(), tableName);
         }

-        return matchedTables.get(0);
+        return matchedTables;
     }

     /**
@@ -134,25 +169,22 @@
         // Clear the builder (This is faster than re-creating the builder each 
iteration)
         builder.setLength(0);

-        int counter;
-        builder.append("{");
+        builder.append("{\"partition\":\"");
+        builder.append(chunkNumber - 1);
+        builder.append("\"");

         // We loop only to the item before the last, then add the last item 
manually to avoid appending the ","
         // at the end, this way we avoid constantly checking if we're at the 
last item or substring the whole record
-        for (counter = 0; counter < values.get(0).size() - 1; counter++) {
-            builder.append("\"");
-            builder.append(selectedTable.getColumns()[counter].getName());
+        for (int counter = 0; counter < values.get(0).size(); counter++) {
+            builder.append(",\"");
+            builder.append(currentTable.getColumns()[counter].getName());
             builder.append("\":\"");
             builder.append(values.get(0).get(counter));
-            builder.append("\",");
+            builder.append("\"");
         }

         // This is the last item to be appended, don't append the "," after 
appending the field
-        builder.append("\"");
-        builder.append(selectedTable.getColumns()[counter].getName());
-        builder.append("\":\"");
-        builder.append(values.get(0).get(counter));
-        builder.append("\"}");
+        builder.append("}");

         return builder.toString();
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
index fb871b9..7bcc6c2 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
@@ -49,6 +49,7 @@
  */
 public class TPCDSDataGeneratorRewriter extends FunctionRewriter {

+    public static final String TPCDS_ALL_TABLES = "ALL";
     public static final FunctionIdentifier TPCDS_DATA_GENERATOR =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"tpcds-datagen", 2);
     public static final TPCDSDataGeneratorRewriter INSTANCE = new 
TPCDSDataGeneratorRewriter(TPCDS_DATA_GENERATOR);
@@ -61,7 +62,6 @@
     protected FunctionDataSource toDatasource(IOptimizationContext context, 
AbstractFunctionCallExpression function)
             throws AlgebricksException {

-        // TODO(Hussain) check if it is safe to assume that we're receiving 
constant expressions only as arguments.
         UnnestingFunctionCallExpression functionCall = 
(UnnestingFunctionCallExpression) function;
         ConstantExpression tableNameArgument = (ConstantExpression) 
functionCall.getArguments().get(0).getValue();
         ConstantExpression scalingFactorArgument = (ConstantExpression) 
functionCall.getArguments().get(1).getValue();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 6eac23b..4f5adec 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -75,7 +75,7 @@
     public static final String DEFAULT_CONF_FILE = joinPath(RESOURCES_PATH, 
"cc.conf");
     private static final String DEFAULT_STORAGE_PATH = joinPath("target", 
"io", "dir");
     private static String storagePath = DEFAULT_STORAGE_PATH;
-    private static final long RESULT_TTL = TimeUnit.MINUTES.toMillis(5);
+    private static final long RESULT_TTL = TimeUnit.MINUTES.toMillis(30); // 
TODO(Hussain): Revert back to 5 mins

     static {
         System.setProperty("java.util.logging.manager", 
org.apache.logging.log4j.jul.LogManager.class.getName());
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 3b5cf2e..1635507 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -91,7 +91,7 @@
     protected abstract IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
             AlgebricksAbsolutePartitionConstraint locations);

-    protected static AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
+    protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
         String[] allPartitions = csm.getClusterLocations().getLocations();
         Set<String> ncs = new HashSet<>(Arrays.asList(allPartitions));
         return new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new 
String[ncs.size()]));

--
To view, visit https://asterix-gerrit.ics.uci.edu/3437
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iff199b0c533d22bcae1caf5057788b257ba4e486
Gerrit-Change-Number: 3437
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <hussai...@gmail.com>

Reply via email to