This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 48de384bb3 [MINOR] Update cocode algorithms for CLA
48de384bb3 is described below

commit 48de384bb3dca3e63f35b654e907e9ecaf5d747c
Author: Sebastian Baunsgaard <baunsga...@apache.org>
AuthorDate: Tue Apr 9 20:16:50 2024 +0200

    [MINOR] Update cocode algorithms for CLA
    
    This commit adds a new memorizer that rely on an array in
    the size of number of columns to compress, instead of a hashmap with all.
    The memory footprint is the same, but the performance is very much
    improved because it allows constant time deletion of all memorized
    column groups that contains a combination with the given specific columns.
    
    The technique first allocate an array in size number of columns
    each index get its own hashmap. containing the columngroup associated with 
it.
    then when combining columnsgroups, the lowest index of all columns combined
    determine which array index hash map to add the combined index into.
    Once a combination is chosen, the buckets of the lowest index of each
    column group combined is reset, and the combined columngroup is inserted.
    
    The result is constant time O(1) deletion and insertion in the memorizer
---
 .../runtime/compress/cocode/AColumnCoCoder.java    |  7 +-
 .../runtime/compress/cocode/CoCodeGreedy.java      | 36 +++++++---
 .../runtime/compress/cocode/CoCodeHybrid.java      | 33 ++++++----
 .../runtime/compress/cocode/CoCodePriorityQue.java | 43 ++++++------
 .../runtime/compress/cocode/CoCoderFactory.java    | 23 +++++--
 .../sysds/runtime/compress/cocode/ColIndexes.java  |  4 +-
 .../sysds/runtime/compress/cocode/Memorizer.java   | 13 ++--
 .../cocode/{Memorizer.java => MemorizerV2.java}    | 53 ++++++++-------
 .../sysds/runtime/compress/estim/AComEst.java      | 76 +++++++++++++---------
 .../compress/estim/CompressedSizeInfoColGroup.java | 21 ++++++
 10 files changed, 196 insertions(+), 113 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
index fc13e16f65..cfe1b1b55e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
@@ -26,6 +26,10 @@ import org.apache.sysds.runtime.compress.cost.ACostEstimate;
 import org.apache.sysds.runtime.compress.estim.AComEst;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 
+/**
+ * Main abstract class for the co-coding of columns to combine different 
compression statistics and calculate the
+ * combinations of columns
+ */
 public abstract class AColumnCoCoder {
 
        protected static final Log LOG = 
LogFactory.getLog(AColumnCoCoder.class.getName());
@@ -34,8 +38,7 @@ public abstract class AColumnCoCoder {
        protected final ACostEstimate _cest;
        protected final CompressionSettings _cs;
 
-       protected AColumnCoCoder(AComEst sizeEstimator, ACostEstimate 
costEstimator,
-               CompressionSettings cs) {
+       protected AColumnCoCoder(AComEst sizeEstimator, ACostEstimate 
costEstimator, CompressionSettings cs) {
                _sest = sizeEstimator;
                _cest = costEstimator;
                _cs = cs;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
index d5d6c6936e..45f5654ab2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
@@ -37,14 +37,14 @@ import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class CoCodeGreedy extends AColumnCoCoder {
 
-       private final Memorizer mem;
+       private final MemorizerV2 mem;
 
        protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate 
costEstimator, CompressionSettings cs) {
                super(sizeEstimator, costEstimator, cs);
-               mem = new Memorizer(sizeEstimator);
+               mem = new MemorizerV2(sizeEstimator, 
sizeEstimator.getNumColumns());
        }
 
-       protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate 
costEstimator, CompressionSettings cs, Memorizer mem) {
+       protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate 
costEstimator, CompressionSettings cs, MemorizerV2 mem) {
                super(sizeEstimator, costEstimator, cs);
                this.mem = mem;
        }
@@ -93,16 +93,22 @@ public class CoCodeGreedy extends AColumnCoCoder {
                                        for(int j = i + 1; j < workSet.size(); 
j++) {
                                                final ColIndexes c1 = 
workSet.get(i);
                                                final ColIndexes c2 = 
workSet.get(j);
-                                               final double costC1 = 
_cest.getCost(mem.get(c1));
-                                               final double costC2 = 
_cest.getCost(mem.get(c2));
+                                               final 
CompressedSizeInfoColGroup c1i = mem.get(c1);
+                                               final 
CompressedSizeInfoColGroup c2i = mem.get(c2);
+
+                                               final double costC1 = 
_cest.getCost(c1i);
+                                               final double costC2 = 
_cest.getCost(c2i);
 
                                                mem.incst1();
+                                               final int maxCombined = 
c1i.getNumVals() * c2i.getNumVals();
 
                                                // Pruning filter : skip 
dominated candidates
                                                // Since even if the entire 
size of one of the column lists is removed,
                                                // it still does not improve 
compression.
                                                // In the case of workload we 
relax the requirement for the filter.
-                                               if(-Math.min(costC1, costC2) > 
changeInCost)
+                                               if(-Math.min(costC1, costC2) > 
changeInCost // change in cost cannot possibly be better.
+                                                       || (maxCombined < 0) // 
int overflow
+                                                       || (maxCombined > 
c1i.getNumRows())) // higher combined number of rows.
                                                        continue;
 
                                                // Combine the two column 
groups.
@@ -206,10 +212,20 @@ public class CoCodeGreedy extends AColumnCoCoder {
                }
 
                @Override
-               public Object call() {
-                       final IColIndex c = _c1._indexes.combine(_c2._indexes);
-                       final ColIndexes cI = new ColIndexes(c);
-                       mem.getOrCreate(cI, _c1, _c2);
+               public Object call() throws Exception {
+                       final CompressedSizeInfoColGroup c1i = mem.get(_c1);
+                       final CompressedSizeInfoColGroup c2i = mem.get(_c2);
+                       if(c1i != null && c2i != null) {
+                               final int maxCombined = c1i.getNumVals() * 
c2i.getNumVals();
+
+                               if(maxCombined < 0 // int overflow
+                                       || maxCombined > c1i.getNumRows()) // 
higher combined than number of rows.
+                                       return null;
+
+                               final IColIndex c = 
_c1._indexes.combine(_c2._indexes);
+                               final ColIndexes cI = new ColIndexes(c);
+                               mem.getOrCreate(cI, _c1, _c2);
+                       }
                        return null;
                }
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java
index 6dc53739d2..c2d3dc9667 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.compress.cocode;
 
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.cost.ACostEstimate;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
 import org.apache.sysds.runtime.compress.estim.AComEst;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
@@ -37,33 +38,43 @@ public class CoCodeHybrid extends AColumnCoCoder {
        @Override
        protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, 
int k) {
                final int startSize = colInfos.getInfo().size();
+               final int pqColumnThreashold = Math.max(128, 
(_sest.getNumColumns() / startSize) * 100);
+               LOG.error(pqColumnThreashold);
+
                if(startSize == 1)
                        return colInfos; // nothing to join when there only is 
one column
                else if(startSize <= 16) {// Greedy all compare all if small 
number of columns
-                       LOG.debug("Hybrid chose to do greedy cocode because of 
few columns");
+                       LOG.debug("Hybrid chose to do greedy CoCode because of 
few columns");
                        CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs);
                        return colInfos.setInfo(gd.combine(colInfos.getInfo(), 
k));
                }
-               else if(startSize > 1000)
-                       return 
colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 1, 
k));
-               LOG.debug("Using Hybrid Cocode Strategy: ");
+               else if(startSize > 1000) {
+                       CoCodePriorityQue pq = new CoCodePriorityQue(_sest, 
_cest, _cs, pqColumnThreashold);
+
+                       return colInfos.setInfo(pq.join(colInfos.getInfo(), 1, 
k));
+               }
+               LOG.debug("Using Hybrid CoCode Strategy: ");
 
                final int PriorityQueGoal = startSize / 5;
                if(PriorityQueGoal > 30) { // hybrid if there is a large number 
of columns to begin with
                        Timing time = new Timing(true);
-                       
colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 
PriorityQueGoal, k));
-                       LOG.debug("Que based time: " + time.stop());
+                       CoCodePriorityQue pq = new CoCodePriorityQue(_sest, 
_cest, _cs, pqColumnThreashold);
+                       colInfos.setInfo(pq.join(colInfos.getInfo(), 
PriorityQueGoal, k));
                        final int pqSize = colInfos.getInfo().size();
-                       if(pqSize <= PriorityQueGoal * 2) {
-                               time = new Timing(true);
+
+                       LOG.debug("Que based time: " + time.stop());
+                       if(pqSize < PriorityQueGoal || (pqSize < startSize && 
_cest instanceof ComputationCostEstimator)) {
                                CoCodeGreedy gd = new CoCodeGreedy(_sest, 
_cest, _cs);
                                colInfos.setInfo(gd.combine(colInfos.getInfo(), 
k));
                                LOG.debug("Greedy time:     " + time.stop());
                        }
                        return colInfos;
                }
-               else // If somewhere in between use the que based approach only.
-                       return 
colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 1, 
k));
-
+               else {
+                       LOG.debug("Using only Greedy based since Nr Column 
groups: " + startSize + " is not large enough");
+                       CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs);
+                       colInfos.setInfo(gd.combine(colInfos.getInfo(), k));
+                       return colInfos;
+               }
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
index b600c697db..ca7135c262 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
@@ -48,27 +48,30 @@ public class CoCodePriorityQue extends AColumnCoCoder {
 
        private static final int COL_COMBINE_THRESHOLD = 1024;
 
-       protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate 
costEstimator, CompressionSettings cs) {
+       private final int lastCombineThreshold;
+
+       protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate 
costEstimator, CompressionSettings cs,
+               int lastCombineThreshold) {
                super(sizeEstimator, costEstimator, cs);
+               this.lastCombineThreshold = lastCombineThreshold;
        }
 
        @Override
        protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, 
int k) {
-               colInfos.setInfo(join(colInfos.getInfo(), _sest, _cest, 1, k));
+               colInfos.setInfo(join(colInfos.getInfo(), 1, k));
                return colInfos;
        }
 
-       protected static List<CompressedSizeInfoColGroup> 
join(List<CompressedSizeInfoColGroup> groups, AComEst sEst,
-               ACostEstimate cEst, int minNumGroups, int k) {
+       protected List<CompressedSizeInfoColGroup> 
join(List<CompressedSizeInfoColGroup> groups, int minNumGroups, int k) {
 
                if(groups.size() > COL_COMBINE_THRESHOLD && k > 1)
-                       return combineMultiThreaded(groups, sEst, cEst, 
minNumGroups, k);
+                       return combineMultiThreaded(groups, _sest, _cest, 
minNumGroups, k);
                else
-                       return combineSingleThreaded(groups, sEst, cEst, 
minNumGroups);
+                       return combineSingleThreaded(groups, _sest, _cest, 
minNumGroups);
        }
 
-       private static List<CompressedSizeInfoColGroup> 
combineMultiThreaded(List<CompressedSizeInfoColGroup> groups,
-               AComEst sEst, ACostEstimate cEst, int minNumGroups, int k) {
+       private List<CompressedSizeInfoColGroup> 
combineMultiThreaded(List<CompressedSizeInfoColGroup> groups, AComEst sEst,
+               ACostEstimate cEst, int minNumGroups, int k) {
                final ExecutorService pool = CommonThreadPool.get(k);
                try {
                        final List<PQTask> tasks = new ArrayList<>();
@@ -90,18 +93,18 @@ public class CoCodePriorityQue extends AColumnCoCoder {
                catch(Exception e) {
                        throw new DMLCompressionException("Failed parallel 
priority que cocoding", e);
                }
-               finally{
+               finally {
                        pool.shutdown();
                }
        }
 
-       private static List<CompressedSizeInfoColGroup> 
combineSingleThreaded(List<CompressedSizeInfoColGroup> groups,
-               AComEst sEst, ACostEstimate cEst, int minNumGroups) {
+       private List<CompressedSizeInfoColGroup> 
combineSingleThreaded(List<CompressedSizeInfoColGroup> groups, AComEst sEst,
+               ACostEstimate cEst, int minNumGroups) {
                return combineBlock(groups, 0, groups.size(), sEst, cEst, 
minNumGroups);
        }
 
-       private static List<CompressedSizeInfoColGroup> 
combineBlock(List<CompressedSizeInfoColGroup> groups, int start,
-               int end, AComEst sEst, ACostEstimate cEst, int minNumGroups) {
+       private List<CompressedSizeInfoColGroup> 
combineBlock(List<CompressedSizeInfoColGroup> groups, int start, int end,
+               AComEst sEst, ACostEstimate cEst, int minNumGroups) {
                Queue<CompressedSizeInfoColGroup> que = getQue(end - start, 
cEst);
 
                for(int i = start; i < end; i++) {
@@ -113,7 +116,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
                return combineBlock(que, sEst, cEst, minNumGroups);
        }
 
-       private static List<CompressedSizeInfoColGroup> 
combineBlock(Queue<CompressedSizeInfoColGroup> que, AComEst sEst,
+       private List<CompressedSizeInfoColGroup> 
combineBlock(Queue<CompressedSizeInfoColGroup> que, AComEst sEst,
                ACostEstimate cEst, int minNumGroups) {
 
                List<CompressedSizeInfoColGroup> ret = new ArrayList<>();
@@ -133,21 +136,21 @@ public class CoCodePriorityQue extends AColumnCoCoder {
                                if(costOfJoin < costIndividual) {
                                        que.poll();
                                        int numColumns = g.getColumns().size();
-                                       if(numColumns > 128){
+                                       if(numColumns > lastCombineThreshold) {
                                                lastCombine++;
                                                ret.add(g);
                                        }
-                                       else{
+                                       else {
                                                lastCombine = 0;
                                                que.add(g);
                                        }
                                }
-                               else{
+                               else {
                                        lastCombine++;
                                        ret.add(l);
                                }
                        }
-                       else{
+                       else {
                                lastCombine++;
                                ret.add(l);
                        }
@@ -155,7 +158,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
                        l = que.poll();
                        groupNr = ret.size() + que.size();
                }
-               while(que.peek() != null){
+               while(que.peek() != null) {
                        // empty que
                        ret.add(l);
                        l = que.poll();
@@ -180,7 +183,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
                return cEst.getCost(x) + x.getColumns().avgOfIndex() / 100000;
        }
 
-       protected static class PQTask implements 
Callable<List<CompressedSizeInfoColGroup>> {
+       protected class PQTask implements 
Callable<List<CompressedSizeInfoColGroup>> {
 
                private final List<CompressedSizeInfoColGroup> _groups;
                private final int _start;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java
index abd12d3f6a..6c560fb979 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java
@@ -63,16 +63,18 @@ public interface CoCoderFactory {
                AColumnCoCoder co = 
createColumnGroupPartitioner(cs.columnPartitioner, est, costEstimator, cs);
 
                // Find out if any of the groups are empty.
-               final boolean containsEmptyOrConst = 
containsEmptyOrConst(colInfos);
+               final boolean containsEmptyConstOrIncompressable = 
containsEmptyConstOrIncompressable(colInfos);
 
                // if there are no empty or const columns then try cocode 
algorithms for all columns
-               if(!containsEmptyOrConst)
+               if(!containsEmptyConstOrIncompressable)
                        return co.coCodeColumns(colInfos, k);
                else {
                        // filtered empty groups
                        final List<IColIndex> emptyCols = new ArrayList<>();
                        // filtered const groups
                        final List<IColIndex> constCols = new ArrayList<>();
+                       // incompressable groups
+                       final List<IColIndex> incompressable = new 
ArrayList<>();
                        // filtered groups -- in the end starting with all 
groups
                        final List<CompressedSizeInfoColGroup> groups = new 
ArrayList<>();
 
@@ -85,13 +87,15 @@ public interface CoCoderFactory {
                                        emptyCols.add(g.getColumns());
                                else if(g.isConst())
                                        constCols.add(g.getColumns());
+                               else if(g.isIncompressable())
+                                       incompressable.add(g.getColumns());
                                else
                                        groups.add(g);
                        }
 
                        // overwrite groups.
                        colInfos.compressionInfo = groups;
-                       
+
                        // cocode remaining groups
                        if(!groups.isEmpty()) {
                                colInfos = co.coCodeColumns(colInfos, k);
@@ -109,14 +113,19 @@ public interface CoCoderFactory {
                                colInfos.compressionInfo.add(new 
CompressedSizeInfoColGroup(idx, nRow, CompressionType.CONST));
                        }
 
+                       if(incompressable.size() > 0) {
+                               final IColIndex idx = 
ColIndexFactory.combineIndexes(incompressable);
+                               colInfos.compressionInfo.add(new 
CompressedSizeInfoColGroup(idx, nRow, CompressionType.UNCOMPRESSED));
+                       }
+
                        return colInfos;
 
                }
        }
 
-       private static boolean containsEmptyOrConst(CompressedSizeInfo 
colInfos) {
+       private static boolean 
containsEmptyConstOrIncompressable(CompressedSizeInfo colInfos) {
                for(CompressedSizeInfoColGroup g : colInfos.compressionInfo)
-                       if(g.isEmpty() || g.isConst())
+                       if(g.isEmpty() || g.isConst() || g.isIncompressable())
                                return true;
                return false;
        }
@@ -133,9 +142,9 @@ public interface CoCoderFactory {
                        case STATIC:
                                return new CoCodeStatic(est, costEstimator, cs);
                        case PRIORITY_QUE:
-                               return new CoCodePriorityQue(est, 
costEstimator, cs);
+                               return new CoCodePriorityQue(est, 
costEstimator, cs, 128);
                        default:
-                               throw new RuntimeException("Unsupported column 
group partitioner: " + type.toString());
+                               throw new RuntimeException("Unsupported column 
group partition technique: " + type.toString());
                }
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java
index dcdcbe464c..910c640d72 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java
@@ -42,9 +42,9 @@ public class ColIndexes {
        }
 
        public boolean contains(ColIndexes a, ColIndexes b) {
-
                if(a == null || b == null)
                        return false;
-               return _indexes.contains(a._indexes.get(0)) ||  
_indexes.contains(b._indexes.get(0));
+               return _indexes.contains(a._indexes.get(0)) //
+                       || _indexes.contains(b._indexes.get(0));
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java
index db77a32bf6..9ac0d5c948 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.estim.AComEst;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
 
@@ -50,8 +51,6 @@ public class Memorizer {
        }
 
        public void remove(ColIndexes c1, ColIndexes c2) {
-               mem.remove(c1);
-               mem.remove(c2);
                Iterator<Entry<ColIndexes, CompressedSizeInfoColGroup>> i = 
mem.entrySet().iterator();
                while(i.hasNext()) {
                        final ColIndexes eci = i.next().getKey();
@@ -60,7 +59,7 @@ public class Memorizer {
                }
        }
 
-       public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes 
c1, ColIndexes c2){
+       public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes 
c1, ColIndexes c2) {
                CompressedSizeInfoColGroup g = mem.get(cI);
                st2++;
                if(g == null) {
@@ -69,7 +68,11 @@ public class Memorizer {
                        if(left != null && right != null) {
                                st3++;
                                g = _sEst.combine(cI._indexes, left, right);
-
+                               if(g != null) {
+                                       if(g.getNumVals() < 0)
+                                               throw new 
DMLCompressionException(
+                                                       "Combination returned 
less distinct values on: \n" + left + "\nand\n" + right + "\nEq\n" + g);
+                               }
                                synchronized(this) {
                                        mem.put(cI, g);
                                }
@@ -88,7 +91,7 @@ public class Memorizer {
        }
 
        public String stats() {
-               return " possible: " + st1 + " requests: " + st2 + " combined: 
" + st3  + " outSecond: "+ st4;
+               return " possible: " + st1 + " requests: " + st2 + " combined: 
" + st3 + " outSecond: " + st4;
        }
 
        public void resetStats() {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java
similarity index 63%
copy from src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java
copy to src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java
index db77a32bf6..b63a3657fc 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java
@@ -20,58 +20,63 @@
 package org.apache.sysds.runtime.compress.cocode;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
 import org.apache.sysds.runtime.compress.estim.AComEst;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
 
-public class Memorizer {
+public class MemorizerV2 {
        private final AComEst _sEst;
-       private final Map<ColIndexes, CompressedSizeInfoColGroup> mem;
+
+       private final Map<ColIndexes, CompressedSizeInfoColGroup>[] mem;
        private int st1 = 0, st2 = 0, st3 = 0, st4 = 0;
 
-       public Memorizer(AComEst sEst) {
+       @SuppressWarnings("unchecked")
+       public MemorizerV2(AComEst sEst, int nCol) {
                _sEst = sEst;
-               mem = new HashMap<>();
+               mem = new Map[nCol];
        }
 
        public void put(CompressedSizeInfoColGroup g) {
-               mem.put(new ColIndexes(g.getColumns()), g);
+               put(new ColIndexes(g.getColumns()), g);
        }
 
        public void put(ColIndexes key, CompressedSizeInfoColGroup val) {
-               mem.put(key, val);
+               final IColIndex gi = key._indexes;
+               final int bucketID = gi.get(0);
+               Map<ColIndexes, CompressedSizeInfoColGroup> bucket = 
mem[bucketID];
+               if(bucket == null)
+                       bucket = mem[bucketID] = new HashMap<>();
+               bucket.put(key, val);
        }
 
        public CompressedSizeInfoColGroup get(ColIndexes c) {
-               return mem.get(c);
+               return mem[c._indexes.get(0)].get(c);
        }
 
        public void remove(ColIndexes c1, ColIndexes c2) {
-               mem.remove(c1);
-               mem.remove(c2);
-               Iterator<Entry<ColIndexes, CompressedSizeInfoColGroup>> i = 
mem.entrySet().iterator();
-               while(i.hasNext()) {
-                       final ColIndexes eci = i.next().getKey();
-                       if(eci.contains(c1, c2))
-                               i.remove();
-               }
+               mem[c1._indexes.get(0)] = null;
+               mem[c2._indexes.get(0)] = null;
        }
 
-       public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes 
c1, ColIndexes c2){
-               CompressedSizeInfoColGroup g = mem.get(cI);
+       public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes 
c1, ColIndexes c2) {
+               CompressedSizeInfoColGroup g = get(cI);
                st2++;
                if(g == null) {
-                       final CompressedSizeInfoColGroup left = mem.get(c1);
-                       final CompressedSizeInfoColGroup right = mem.get(c2);
+                       final CompressedSizeInfoColGroup left = get(c1);
+                       final CompressedSizeInfoColGroup right = get(c2);
                        if(left != null && right != null) {
                                st3++;
                                g = _sEst.combine(cI._indexes, left, right);
-
+                               if(g != null) {
+                                       if(g.getNumVals() < 0)
+                                               throw new 
DMLCompressionException(
+                                                       "Combination returned 
less distinct values on: \n" + left + "\nand\n" + right + "\nEq\n" + g);
+                               }
                                synchronized(this) {
-                                       mem.put(cI, g);
+                                       put(cI, g);
                                }
                        }
 
@@ -88,7 +93,7 @@ public class Memorizer {
        }
 
        public String stats() {
-               return " possible: " + st1 + " requests: " + st2 + " combined: 
" + st3  + " outSecond: "+ st4;
+               return " possible: " + st1 + " requests: " + st2 + " combined: 
" + st3 + " outSecond: " + st4;
        }
 
        public void resetStats() {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java 
b/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java
index 03cf173a13..832725f328 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java
@@ -22,7 +22,6 @@ package org.apache.sysds.runtime.compress.estim;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -63,11 +62,21 @@ public abstract class AComEst {
                _cs = cs;
        }
 
-       protected int getNumRows() {
+       /**
+        * Get the number of rows in the overall compressing block.
+        * 
+        * @return The number of rows
+        */
+       public int getNumRows() {
                return _cs.transposed ? _data.getNumColumns() : 
_data.getNumRows();
        }
 
-       protected int getNumColumns() {
+       /**
+        * Get the number of columns in the overall compressing block.
+        * 
+        * @return The number of cols
+        */
+       public int getNumColumns() {
                return _cs.transposed ? _data.getNumRows() : 
_data.getNumColumns();
        }
 
@@ -221,6 +230,13 @@ public abstract class AComEst {
        protected abstract CompressedSizeInfoColGroup combine(IColIndex 
combinedColumns, CompressedSizeInfoColGroup g1,
                CompressedSizeInfoColGroup g2, int maxDistinct);
 
+       /**
+        * Collect the compressed size for all individual columns using the 
available k parallelism degree.
+        * 
+        * @param clen The number of total columns
+        * @param k    The parallelization degree
+        * @return A list of the individual columns compressibility.
+        */
        protected List<CompressedSizeInfoColGroup> 
CompressedSizeInfoColGroup(int clen, int k) {
                if(k <= 1)
                        return CompressedSizeInfoColGroupSingleThread(clen);
@@ -228,6 +244,12 @@ public abstract class AComEst {
                        return CompressedSizeInfoColGroupParallel(clen, k);
        }
 
+       /**
+        * Compress the column groups using a single thread.
+        * 
+        * @param clen the number of total columns
+        * @return A list of the individual columns compressibility.
+        */
        private List<CompressedSizeInfoColGroup> 
CompressedSizeInfoColGroupSingleThread(int clen) {
                List<CompressedSizeInfoColGroup> ret = new ArrayList<>(clen);
                if(!_cs.transposed && !_data.isEmpty() && 
_data.isInSparseFormat())
@@ -237,6 +259,13 @@ public abstract class AComEst {
                return ret;
        }
 
+       /**
+        * Collect the compressed size for all individual columns using the 
available k parallelism degree.
+        * 
+        * @param clen The number of total columns
+        * @param k    The parallelization degree
+        * @return A list of the individual columns compressibility.
+        */
        private List<CompressedSizeInfoColGroup> 
CompressedSizeInfoColGroupParallel(int clen, int k) {
                final ExecutorService pool = CommonThreadPool.get(k);
                try {
@@ -249,15 +278,22 @@ public abstract class AComEst {
 
                        CompressedSizeInfoColGroup[] res = new 
CompressedSizeInfoColGroup[clen];
                        final int blkz = Math.max(1, clen / (k * 10));
-                       final ArrayList<SizeEstimationTask> tasks = new 
ArrayList<>(clen / blkz + 1);
+                       final ArrayList<Future<Object>> tasks = new 
ArrayList<>(clen / blkz + 1);
 
                        if(blkz != 1)
                                LOG.debug("Extracting column samples in blocks 
of " + blkz);
 
-                       for(int col = 0; col < clen; col += blkz)
-                               tasks.add(new SizeEstimationTask(res, col, 
Math.min(clen, col + blkz)));
+                       for(int col = 0; col < clen; col += blkz) {
+                               final int start = col;
+                               final int end = Math.min(clen, col + blkz);
+                               tasks.add(pool.submit(() -> {
+                                       for(int c = start; c < end; c++)
+                                               res[c] = getColGroupInfo(new 
SingleIndex(c));
+                                       return null;
+                               }));
+                       }
 
-                       for(Future<Object> f : pool.invokeAll(tasks))
+                       for(Future<Object> f : tasks)
                                f.get();
 
                        return Arrays.asList(res);
@@ -265,35 +301,11 @@ public abstract class AComEst {
                catch(Exception e) {
                        throw new DMLCompressionException("Multithreaded first 
extraction failed", e);
                }
-               finally{
+               finally {
                        pool.shutdown();
                }
        }
 
-       private class SizeEstimationTask implements Callable<Object> {
-               final CompressedSizeInfoColGroup[] _res;
-               final int _cs;
-               final int _ce;
-
-               private SizeEstimationTask(CompressedSizeInfoColGroup[] res, 
int cs, int ce) {
-                       _res = res;
-                       _cs = cs;
-                       _ce = ce;
-               }
-
-               @Override
-               public Object call() {
-                       try {
-                               for(int c = _cs; c < _ce; c++)
-                                       _res[c] = getColGroupInfo(new 
SingleIndex(c));
-                               return null;
-                       }
-                       catch(Exception e) {
-                               throw new DMLCompressionException("ColGroup 
extraction failed", e);
-                       }
-               }
-       }
-
        @Override
        public String toString() {
                return this.getClass().getSimpleName();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index 1168147b3d..4fbf9b0ee4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -129,6 +129,10 @@ public class CompressedSizeInfoColGroup {
                                _sizes.put(ct,
                                        (double) 
ColGroupSizes.estimateInMemorySizeCONST(columns.size(), columns.isContiguous(), 
1.0, false));
                                break;
+                       case UNCOMPRESSED:
+                               _sizes.put(ct, (double) 
ColGroupSizes.estimateInMemorySizeUncompressed(nRows, columns.isContiguous(),
+                                       columns.size(), 1.0));
+                               break;
                        default:
                                throw new DMLCompressionException("Invalid 
instantiation of const Cost");
                }
@@ -206,6 +210,10 @@ public class CompressedSizeInfoColGroup {
                return _map;
        }
 
+       public void setMap(IEncode map) {
+               _map = map;
+       }
+
        public boolean containsZeros() {
                return _facts.numOffs < _facts.numRows;
        }
@@ -229,6 +237,10 @@ public class CompressedSizeInfoColGroup {
                return _bestCompressionType == CompressionType.CONST || 
_sizes.containsKey(CompressionType.CONST);
        }
 
+       public boolean isIncompressable() {
+               return _bestCompressionType == CompressionType.UNCOMPRESSED;
+       }
+
        private static double getCompressionSize(IColIndex cols, 
CompressionType ct, EstimationFactors fact) {
                int nv;
                final int numCols = cols.size();
@@ -284,6 +296,15 @@ public class CompressedSizeInfoColGroup {
                sb.append(" Sizes: " + _sizes);
                sb.append(" facts: " + _facts);
                sb.append(" mapIsNull: " + (_map == null));
+               if(_map != null) {
+                       String s = _map.toString();
+                       if(s.length() > 1000) {
+                               sb.append(s, 0, 1000);
+                       }
+                       else {
+                               sb.append(s);
+                       }
+               }
                return sb.toString();
        }
 

Reply via email to