http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java index f25899f..2b6575a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java @@ -40,6 +40,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.CMOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes; +import org.apache.sysml.utils.IntUtils; public class CentralMomentSPInstruction extends UnarySPInstruction { @@ -104,7 +105,7 @@ public class CentralMomentSPInstruction extends UnarySPInstruction { ScalarObject order = ec.getScalarInput(scalarInput.getName(), scalarInput.getValueType(), scalarInput.isLiteral()); CMOperator cop = ((CMOperator)_optr); if ( cop.getAggOpType() == AggregateOperationTypes.INVALID ) { - cop.setCMAggOp((int)order.getLongValue()); + cop.setCMAggOp(IntUtils.toInt(order.getLongValue())); } //get input
http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java index 308e60f..d83a6ed 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java @@ -49,6 +49,7 @@ import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.ReorgOperator; +import org.apache.sysml.utils.IntUtils; /** * Cpmm: cross-product matrix multiplication operation (distributed matrix multiply @@ -165,9 +166,9 @@ public class CpmmSPInstruction extends BinarySPInstruction { } private static int getMaxParJoin(MatrixCharacteristics mc1, MatrixCharacteristics mc2) { - return mc1.colsKnown() ? (int)mc1.getNumColBlocks() : - mc2.rowsKnown() ? (int)mc2.getNumRowBlocks() : - Integer.MAX_VALUE; + return IntUtils.toInt(mc1.colsKnown() ? mc1.getNumColBlocks() : + mc2.rowsKnown() ? mc2.getNumRowBlocks() : + Integer.MAX_VALUE); } private static class CpmmIndexFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, Long, IndexedMatrixValue> http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java index 68cc6db..a0dfb85 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java @@ -39,6 +39,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.UnaryOperator; +import org.apache.sysml.utils.IntUtils; public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstruction { @@ -76,7 +77,7 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio //merge partial aggregates, adjusting for correct number of partitions //as size can significant shrink (1K) but also grow (sparse-dense) int numParts = SparkUtils.getNumPreferredPartitions(mcOut); - int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks()); + int minPar = IntUtils.toInt(Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks())); out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false); //put output handle in symbol table @@ -133,9 +134,9 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio //cumsum expand partial aggregates long rlenOut = (long)Math.ceil((double)_rlen/_brlen); long rixOut = (long)Math.ceil((double)ixIn.getRowIndex()/_brlen); - int rlenBlk = (int) Math.min(rlenOut-(rixOut-1)*_brlen, _brlen); + int rlenBlk = IntUtils.toInt( Math.min(rlenOut-(rixOut-1)*_brlen, _brlen)); int clenBlk = blkOut.getNumColumns(); - int posBlk = (int) ((ixIn.getRowIndex()-1) % _brlen); + int posBlk = IntUtils.toInt((ixIn.getRowIndex()-1) % _brlen); MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, false); blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, true); ixOut.setIndexes(rixOut, ixOut.getColumnIndex()); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java index c32a57b..952a6d0 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java @@ -45,6 +45,7 @@ import org.apache.sysml.runtime.matrix.operators.BinaryOperator; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.UnaryOperator; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public class CumulativeOffsetSPInstruction extends BinarySPInstruction { private BinaryOperator _bop = null; @@ -209,7 +210,7 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction { //lookup offset row and return joined output MatrixBlock off = (ixIn.getRowIndex() == 1) ? new MatrixBlock(1, blkIn.getNumColumns(), _initValue) : - _pbc.getBlock((int)brix, (int)ixIn.getColumnIndex()).slice(rix, rix); + _pbc.getBlock(IntUtils.toInt(brix), IntUtils.toInt(ixIn.getColumnIndex())).slice(rix, rix); return new Tuple2<MatrixIndexes, Tuple2<MatrixBlock,MatrixBlock>>(ixIn, new Tuple2<>(blkIn,off)); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java index fbb214e..e734fb9 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java @@ -47,6 +47,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.operators.ReorgOperator; import org.apache.sysml.runtime.util.DnnUtils; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.NativeHelper; import scala.Tuple2; @@ -214,7 +215,7 @@ public class DnnSPInstruction extends UnarySPInstruction { MatrixCharacteristics mcRdd = sec.getMatrixCharacteristics(name); if(mcRdd.getColsPerBlock() < mcRdd.getCols() || mcRdd.getRowsPerBlock() != 1) { MatrixCharacteristics mcOut = new MatrixCharacteristics(mcRdd); - mcOut.setColsPerBlock((int)mcRdd.getCols()); + mcOut.setColsPerBlock(IntUtils.toInt(mcRdd.getCols())); mcOut.setRowsPerBlock(numRowsPerBlock); in1 = RDDAggregateUtils.mergeByKey(in1.flatMapToPair(new ExtractBlockForBinaryReblock(mcRdd, mcOut))); // TODO: Inject checkpoint to avoid doing this repeated for validation set @@ -266,8 +267,8 @@ public class DnnSPInstruction extends UnarySPInstruction { int K = getScalarInput(ec, _filter_shape, 0); int R = getScalarInput(ec, _filter_shape, 2); int S = getScalarInput(ec, _filter_shape, 3); - int P = (int) DnnUtils.getP(H, R, stride_h, pad_h); - int Q = (int) DnnUtils.getQ(W, S, stride_w, pad_w); + int P = IntUtils.toInt(DnnUtils.getP(H, R, stride_h, pad_h)); + int Q = IntUtils.toInt(DnnUtils.getQ(W, S, stride_w, pad_w)); DnnParameters params = new DnnParameters(numRowsPerBlock, C, H, W, K, R, S, stride_h, stride_w, pad_h, pad_w, 1); boolean enableNativeBLAS = NativeHelper.isNativeLibraryLoaded(); @@ -286,7 +287,7 @@ public class DnnSPInstruction extends UnarySPInstruction { throw new DMLRuntimeException("The current operator doesnot support large outputs."); } sec.setMetaData(output.getName(), - new MetaDataFormat(new MatrixCharacteristics(mcRdd.getRows(), numCols, numRowsPerBlock, (int)numCols, nnz), OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo)); + new MetaDataFormat(new MatrixCharacteristics(mcRdd.getRows(), numCols, numRowsPerBlock, IntUtils.toInt(numCols), nnz), OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo)); } else { throw new DMLRuntimeException("Not implemented: " + instOpcode); @@ -294,8 +295,8 @@ public class DnnSPInstruction extends UnarySPInstruction { } private static int getScalarInput(ExecutionContext ec, ArrayList<CPOperand> aL, int index) { - return (int) ec.getScalarInput(aL.get(index).getName(), - aL.get(index).getValueType(), aL.get(index).isLiteral()).getLongValue(); + return IntUtils.toInt( ec.getScalarInput(aL.get(index).getName(), + aL.get(index).getValueType(), aL.get(index).isLiteral()).getLongValue()); } private static class RDDConv2dMapMMFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock>>, MatrixIndexes, MatrixBlock> { http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java index f74f293..64567d1 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java @@ -47,6 +47,7 @@ import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; /** * This class implements the frame indexing functionality inside Spark. @@ -103,7 +104,7 @@ public class FrameIndexingSPInstruction extends IndexingSPInstruction { //update schema of output with subset of input schema sec.getFrameObject(output.getName()).setSchema( - sec.getFrameObject(input1.getName()).getSchema((int)cl, (int)cu)); + sec.getFrameObject(input1.getName()).getSchema(IntUtils.toInt(cl), IntUtils.toInt(cu))); } //left indexing else if ( opcode.equalsIgnoreCase(LeftIndex.OPCODE) || opcode.equalsIgnoreCase("mapLeftIndex")) @@ -186,8 +187,8 @@ public class FrameIndexingSPInstruction extends IndexingSPInstruction { _ixrange = ixrange; _rlen = mcLeft.getRows(); _clen = mcLeft.getCols(); - _brlen = (int) Math.min(OptimizerUtils.getDefaultFrameSize(), _rlen); - _bclen = (int) mcLeft.getCols(); + _brlen = IntUtils.toInt( Math.min(OptimizerUtils.getDefaultFrameSize(), _rlen)); + _bclen = IntUtils.toInt(mcLeft.getCols()); } @Override http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java index c9566a2..f5321f2 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java @@ -38,6 +38,7 @@ import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.utils.IntUtils; public class MapmmChainSPInstruction extends SPInstruction { private ChainType _chainType = null; @@ -168,7 +169,7 @@ public class MapmmChainSPInstruction extends SPInstruction { MatrixBlock pmV = _pmV.getBlock(1, 1); MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); - int rowIx = (int)ixIn.getRowIndex(); + int rowIx = IntUtils.toInt(ixIn.getRowIndex()); //execute mapmmchain operation return blkIn.chainMatrixMultOperations(pmV, _pmW.getBlock(rowIx,1), new MatrixBlock(), _chainType); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java index 7c8d606..4f6fd8d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java @@ -54,6 +54,7 @@ import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.utils.IntUtils; public class MapmmSPInstruction extends BinarySPInstruction { private CacheType _type = null; @@ -238,7 +239,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { isLeft?mcRdd.getCols():mcBc.getCols(), isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(), isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0)); long numParts = sizeOutput / InfrastructureAnalyzer.getHDFSBlockSize(); - return (int)Math.min(numParts, (isLeft?mcRdd.getNumColBlocks():mcRdd.getNumRowBlocks())); + return IntUtils.toInt(Math.min(numParts, (isLeft?mcRdd.getNumColBlocks():mcRdd.getNumRowBlocks()))); } private static class RDDMapMMFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> @@ -272,7 +273,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { if( _type == CacheType.LEFT ) { //get the right hand side matrix - MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex()); + MatrixBlock left = _pbc.getBlock(1, IntUtils.toInt(ixIn.getRowIndex())); //execute matrix-vector mult OperationsOnMatrixValues.matMult(new MatrixIndexes(1,ixIn.getRowIndex()), @@ -281,7 +282,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { else //if( _type == CacheType.RIGHT ) { //get the right hand side matrix - MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1); + MatrixBlock right = _pbc.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1); //execute matrix-vector mult OperationsOnMatrixValues.matMult(ixIn, blkIn, @@ -324,7 +325,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { if( _type == CacheType.LEFT ) { //get the right hand side matrix - MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex()); + MatrixBlock left = _pbc.getBlock(1, IntUtils.toInt(ixIn.getRowIndex())); //execute matrix-vector mult return OperationsOnMatrixValues.matMult( @@ -333,7 +334,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { else //if( _type == CacheType.RIGHT ) { //get the right hand side matrix - MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1); + MatrixBlock right = _pbc.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1); //execute matrix-vector mult return OperationsOnMatrixValues.matMult( @@ -389,7 +390,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { if( _type == CacheType.LEFT ) { //get the right hand side matrix - MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex()); + MatrixBlock left = _pbc.getBlock(1, IntUtils.toInt(ixIn.getRowIndex())); //execute index preserving matrix multiplication OperationsOnMatrixValues.matMult(left, blkIn, blkOut, _op); @@ -397,7 +398,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { else //if( _type == CacheType.RIGHT ) { //get the right hand side matrix - MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1); + MatrixBlock right = _pbc.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1); //execute index preserving matrix multiplication OperationsOnMatrixValues.matMult(blkIn, right, blkOut, _op); @@ -437,14 +438,14 @@ public class MapmmSPInstruction extends BinarySPInstruction { //for all matching left-hand-side blocks, returned as lazy iterator return IntStream.range(1, _pbc.getNumRowBlocks()+1).mapToObj(i -> new Tuple2<>(new MatrixIndexes(i, ixIn.getColumnIndex()), - OperationsOnMatrixValues.matMult(_pbc.getBlock(i, (int)ixIn.getRowIndex()), blkIn, + OperationsOnMatrixValues.matMult(_pbc.getBlock(i, IntUtils.toInt(ixIn.getRowIndex())), blkIn, new MatrixBlock(), _op))).iterator(); } else { //RIGHT //for all matching right-hand-side blocks, returned as lazy iterator return IntStream.range(1, _pbc.getNumColumnBlocks()+1).mapToObj(j -> new Tuple2<>(new MatrixIndexes(ixIn.getRowIndex(), j), - OperationsOnMatrixValues.matMult(blkIn, _pbc.getBlock((int)ixIn.getColumnIndex(), j), + OperationsOnMatrixValues.matMult(blkIn, _pbc.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), j), new MatrixBlock(), _op))).iterator(); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java index 3019a78..da1b975 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java @@ -39,6 +39,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.utils.IntUtils; public class MatrixAppendMSPInstruction extends AppendMSPInstruction { @@ -138,11 +139,11 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction { //output shallow copy of rhs block if( _cbind ) { ret.add( new Tuple2<>(new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1), - _pm.getBlock((int)ix.getRowIndex(), 1)) ); + _pm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1)) ); } else { //rbind ret.add( new Tuple2<>(new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()), - _pm.getBlock(1, (int)ix.getColumnIndex())) ); + _pm.getBlock(1, IntUtils.toInt(ix.getColumnIndex()))) ); } } //case 3: append operation on boundary block @@ -155,7 +156,7 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction { MatrixBlock value_in2 = null; if( _cbind ) { - value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1); + value_in2 = _pm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1); if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) { IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1); @@ -163,7 +164,7 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction { } } else { //rbind - value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex()); + value_in2 = _pm.getBlock(1, IntUtils.toInt(ix.getColumnIndex())); if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) { IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex()); @@ -228,8 +229,8 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction { } //case 3: append operation on boundary block else { - int rowix = _cbind ? (int)ix.getRowIndex() : 1; - int colix = _cbind ? 1 : (int)ix.getColumnIndex(); + int rowix = _cbind ? IntUtils.toInt(ix.getRowIndex()) : 1; + int colix = _cbind ? 1 : IntUtils.toInt(ix.getColumnIndex()); MatrixBlock in2 = _pm.getBlock(rowix, colix); MatrixBlock out = in1.append(in2, new MatrixBlock(), _cbind); return new Tuple2<>(ix, out); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java index d30f330..2c4a01e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java @@ -58,6 +58,7 @@ import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; /** * This class implements the matrix indexing functionality inside CP. @@ -203,8 +204,8 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction { .mapToPair(new SliceBlock2(ixrange, mcOut)); //slice relevant blocks //collect output without shuffle to avoid side-effects with custom PartitionPruningRDD - MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(), - (int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); + MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, IntUtils.toInt(mcOut.getRows()), + IntUtils.toInt(mcOut.getCols()), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); return mbout; } @@ -410,11 +411,11 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction { MatrixBlock right = arg._2(); int rl = UtilFunctions.computeCellInBlock(_ixrange.rowStart, _brlen); - int ru = (int)Math.min(_ixrange.rowEnd, rl+right.getNumRows())-1; + int ru = IntUtils.toInt(Math.min(_ixrange.rowEnd, rl+right.getNumRows())-1); int cl = UtilFunctions.computeCellInBlock(_ixrange.colStart, _brlen); - int cu = (int)Math.min(_ixrange.colEnd, cl+right.getNumColumns())-1; + int cu = IntUtils.toInt(Math.min(_ixrange.colEnd, cl+right.getNumColumns())-1); - MatrixBlock left = _binput.getBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex()); + MatrixBlock left = _binput.getBlock(IntUtils.toInt(ix.getRowIndex()), IntUtils.toInt(ix.getColumnIndex())); MatrixBlock tmp = left.leftIndexingOperations(right, rl, ru, cl, cu, new MatrixBlock(), UpdateType.COPY); @@ -474,10 +475,10 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction { //compute local index range long grix = UtilFunctions.computeCellIndex(ix.getRowIndex(), _brlen, 0); long gcix = UtilFunctions.computeCellIndex(ix.getColumnIndex(), _bclen, 0); - int lrl = (int)((_ixrange.rowStart<grix) ? 0 : _ixrange.rowStart - grix); - int lcl = (int)((_ixrange.colStart<gcix) ? 0 : _ixrange.colStart - gcix); - int lru = (int)Math.min(block.getNumRows()-1, _ixrange.rowEnd - grix); - int lcu = (int)Math.min(block.getNumColumns()-1, _ixrange.colEnd - gcix); + int lrl = IntUtils.toInt((_ixrange.rowStart<grix) ? 0 : _ixrange.rowStart - grix); + int lcl = IntUtils.toInt((_ixrange.colStart<gcix) ? 0 : _ixrange.colStart - gcix); + int lru = IntUtils.toInt(Math.min(block.getNumRows()-1, _ixrange.rowEnd - grix)); + int lcu = IntUtils.toInt(Math.min(block.getNumColumns()-1, _ixrange.colEnd - gcix)); //compute output index MatrixIndexes ixOut = new MatrixIndexes( http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java index cd5e2ba..3d66b20 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java @@ -64,6 +64,7 @@ import org.apache.sysml.runtime.transform.encode.EncoderRecode; import org.apache.sysml.runtime.transform.encode.EncoderMVImpute.MVMethod; import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.transform.meta.TfOffsetMap; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -115,7 +116,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI //step 1: build transform meta data Encoder encoderBuild = EncoderFactory.createEncoder(spec, colnames, - fo.getSchema(), (int)fo.getNumColumns(), null); + fo.getSchema(), IntUtils.toInt(fo.getNumColumns()), null); MaxLongAccumulator accMax = registerMaxLongAccumulator(sec.getSparkContext()); JavaRDD<String> rcMaps = in @@ -146,7 +147,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI //create encoder broadcast (avoiding replication per task) Encoder encoder = EncoderFactory.createEncoder(spec, colnames, - fo.getSchema(), (int)fo.getNumColumns(), meta); + fo.getSchema(), IntUtils.toInt(fo.getNumColumns()), meta); mcOut.setDimension(mcIn.getRows()-((omap!=null)?omap.getNumRmRows():0), encoder.getNumCols()); Broadcast<Encoder> bmeta = sec.getSparkContext().broadcast(encoder); Broadcast<TfOffsetMap> bomap = (omap!=null) ? sec.getSparkContext().broadcast(omap) : null; http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java index 1b6435b..4ddafcc 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java @@ -49,6 +49,7 @@ import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.utils.IntUtils; /** * This pmapmm matrix multiplication instruction is still experimental @@ -105,8 +106,8 @@ public class PMapmmSPInstruction extends BinarySPInstruction { .filter(new IsBlockInRange(i+1, i+NUM_ROWBLOCKS*mc1.getRowsPerBlock(), 1, mc1.getCols(), mc1)) .mapToPair(new PMapMMRebaseBlocksFunction(i/mc1.getRowsPerBlock())); - int rlen = (int)Math.min(mc1.getRows()-i, NUM_ROWBLOCKS*mc1.getRowsPerBlock()); - PartitionedBlock<MatrixBlock> pmb = SparkExecutionContext.toPartitionedMatrixBlock(rdd, rlen, (int)mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1L); + int rlen = IntUtils.toInt(Math.min(mc1.getRows()-i, NUM_ROWBLOCKS*mc1.getRowsPerBlock())); + PartitionedBlock<MatrixBlock> pmb = SparkExecutionContext.toPartitionedMatrixBlock(rdd, rlen, IntUtils.toInt(mc1.getCols()), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1L); Broadcast<PartitionedBlock<MatrixBlock>> bpmb = sec.getSparkContext().broadcast(pmb); //matrix multiplication @@ -190,7 +191,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction { //get the right hand side matrix for( int i=1; i<=pm.getNumRowBlocks(); i++ ) { - MatrixBlock left = pm.getBlock(i, (int)ixIn.getRowIndex()); + MatrixBlock left = pm.getBlock(i, IntUtils.toInt(ixIn.getRowIndex())); //execute matrix-vector mult OperationsOnMatrixValues.matMult(new MatrixIndexes(i,ixIn.getRowIndex()), http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index 4a1c710..bd0344a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -83,6 +83,7 @@ import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.transform.meta.TfOffsetMap; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction { protected HashMap<String, String> params; @@ -240,7 +241,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction MatrixCharacteristics mc1 = sec.getMatrixCharacteristics( targetVar ); MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); - int ngroups = (int) getLongParam(ec, Statement.GAGG_NUM_GROUPS); + int ngroups = IntUtils.toInt(getLongParam(ec, Statement.GAGG_NUM_GROUPS)); //single-block aggregation if( ngroups <= mc1.getRowsPerBlock() && mc1.getCols() <= mc1.getColsPerBlock() ) { @@ -401,12 +402,12 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //update output statistics (required for correctness) MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); - mcOut.set(rows?maxDim:mcIn.getRows(), rows?mcIn.getCols():maxDim, (int)brlen, (int)bclen, mcIn.getNonZeros()); + mcOut.set(rows?maxDim:mcIn.getRows(), rows?mcIn.getCols():maxDim, IntUtils.toInt(brlen), IntUtils.toInt(bclen), mcIn.getNonZeros()); } else //special case: empty output (ensure valid dims) { int n = emptyReturn ? 1 : 0; - MatrixBlock out = new MatrixBlock(rows?n:(int)mcIn.getRows(), rows?(int)mcIn.getCols():n, true); + MatrixBlock out = new MatrixBlock(rows?n:IntUtils.toInt(mcIn.getRows()), rows?IntUtils.toInt(mcIn.getCols()):n, true); sec.setMatrixOutput(output.getName(), out, getExtendedOpcode()); } } @@ -466,8 +467,8 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //repartition input vector for higher degree of parallelism //(avoid scenarios where few input partitions create huge outputs) MatrixCharacteristics mcTmp = new MatrixCharacteristics(dirRows?lmaxVal:mcIn.getRows(), - dirRows?mcIn.getRows():lmaxVal, (int)brlen, (int)bclen, mcIn.getRows()); - int numParts = (int)Math.min(SparkUtils.getNumPreferredPartitions(mcTmp, in), mcIn.getNumBlocks()); + dirRows?mcIn.getRows():lmaxVal, IntUtils.toInt(brlen), IntUtils.toInt(bclen), mcIn.getRows()); + int numParts = IntUtils.toInt(Math.min(SparkUtils.getNumPreferredPartitions(mcTmp, in), mcIn.getNumBlocks())); if( numParts > in.getNumPartitions()*2 ) in = in.repartition(numParts); @@ -482,7 +483,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //update output statistics (required for correctness) MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); - mcOut.set(dirRows?lmaxVal:mcIn.getRows(), dirRows?mcIn.getRows():lmaxVal, (int)brlen, (int)bclen, -1); + mcOut.set(dirRows?lmaxVal:mcIn.getRows(), dirRows?mcIn.getRows():lmaxVal, IntUtils.toInt(brlen), IntUtils.toInt(bclen), -1); } else if ( opcode.equalsIgnoreCase("transformapply") ) { @@ -505,7 +506,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //create encoder broadcast (avoiding replication per task) Encoder encoder = EncoderFactory.createEncoder(params.get("spec"), colnames, - fo.getSchema(), (int)fo.getNumColumns(), meta); + fo.getSchema(), IntUtils.toInt(fo.getNumColumns()), meta); mcOut.setDimension(mcIn.getRows()-((omap!=null)?omap.getNumRmRows():0), encoder.getNumCols()); Broadcast<Encoder> bmeta = sec.getSparkContext().broadcast(encoder); Broadcast<TfOffsetMap> bomap = (omap!=null) ? sec.getSparkContext().broadcast(omap) : null; @@ -532,7 +533,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //reblock if necessary (clen > bclen) if( mc.getCols() > mc.getNumColBlocks() ) { in = in.mapToPair(new RDDTransformDecodeExpandFunction( - (int)mc.getCols(), mc.getColsPerBlock())); + IntUtils.toInt(mc.getCols()), mc.getColsPerBlock())); in = RDDAggregateUtils.mergeByKey(in, false); } @@ -679,8 +680,8 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //prepare inputs (for internal api compatibility) IndexedMatrixValue data = SparkUtils.toIndexedMatrixBlock(arg0._1(),arg0._2()); IndexedMatrixValue offsets = _rmRows ? - SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getBlock((int)arg0._1().getRowIndex(), 1)) : - SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getBlock(1, (int)arg0._1().getColumnIndex())); + SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getBlock(IntUtils.toInt(arg0._1().getRowIndex()), 1)) : + SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getBlock(1, IntUtils.toInt(arg0._1().getColumnIndex()))); //execute remove empty operations ArrayList<IndexedMatrixValue> out = new ArrayList<>(); @@ -754,7 +755,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //get all inputs MatrixIndexes ix = arg0._1(); MatrixBlock target = arg0._2(); - MatrixBlock groups = _pbm.getBlock((int)ix.getRowIndex(), 1); + MatrixBlock groups = _pbm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1); //execute map grouped aggregate operations IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(ix, target); @@ -790,7 +791,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //get all inputs MatrixIndexes ix = arg0._1(); MatrixBlock target = arg0._2(); - MatrixBlock groups = _pbm.getBlock((int)ix.getRowIndex(), 1); + MatrixBlock groups = _pbm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1); //execute map grouped aggregate operations return groups.groupedAggOperations(target, null, new MatrixBlock(), _ngroups, _op); @@ -965,8 +966,8 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction MatrixBlock inBlk = in._2(); //construct expanded block via leftindexing - int cl = (int)UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 0)-1; - int cu = (int)UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, inBlk.getNumColumns()-1)-1; + int cl = IntUtils.toInt(UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 0)-1); + int cu = IntUtils.toInt(UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, inBlk.getNumColumns()-1)-1); MatrixBlock out = new MatrixBlock(inBlk.getNumRows(), _clen, false); out = out.leftIndexingOperations(inBlk, 0, inBlk.getNumRows()-1, cl, cu, null, UpdateType.INPLACE_PINNED); @@ -981,7 +982,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction } if ( params.get(Statement.GAGG_NUM_GROUPS) != null) { - int ngroups = (int) Double.parseDouble(params.get(Statement.GAGG_NUM_GROUPS)); + int ngroups = IntUtils.toInt( Double.parseDouble(params.get(Statement.GAGG_NUM_GROUPS))); mcOut.set(ngroups, mc1.getCols(), -1, -1); //grouped aggregate with cell output } else { http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java index 3914c55..e9928ad 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java @@ -47,6 +47,7 @@ import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public class PmmSPInstruction extends BinarySPInstruction { private CacheType _type = null; @@ -125,7 +126,7 @@ public class PmmSPInstruction extends BinarySPInstruction { MatrixBlock mb2 = arg0._2(); //get the right hand side matrix - MatrixBlock mb1 = _pmV.getBlock((int)ixIn.getRowIndex(), 1); + MatrixBlock mb1 = _pmV.getBlock(IntUtils.toInt(ixIn.getRowIndex()), 1); //compute target block indexes long minPos = UtilFunctions.toLong( mb1.minNonZero() ); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java index 18f0bef..196f270 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java @@ -47,6 +47,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public class QuantilePickSPInstruction extends BinarySPInstruction { private OperationTypes _type = null; @@ -220,7 +221,7 @@ public class QuantilePickSPInstruction extends BinarySPInstruction { if( tmp.getNumRows() <= pos ) throw new DMLRuntimeException("Invalid key lookup for " + pos + " in block of size " + tmp.getNumRows()+"x"+tmp.getNumColumns()); - return val.get(0).quickGetValue((int)pos, 0); + return val.get(0).quickGetValue(IntUtils.toInt(pos), 0); } private static class FilterFunction implements Function<Tuple2<MatrixIndexes,MatrixBlock>, Boolean> @@ -337,7 +338,7 @@ public class QuantilePickSPInstruction extends BinarySPInstruction { return Collections.emptyIterator(); //determine which quantiles are active - int qlen = (int)Arrays.stream(_qPIDs).filter(i -> i==v1).count(); + int qlen = IntUtils.toInt(Arrays.stream(_qPIDs).filter(i -> i==v1).count()); int[] qix = new int[qlen]; for(int i=0, pos=0; i<_qPIDs.length; i++) if( _qPIDs[i]==v1 ) http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java index a5a5d94..d717a8e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java @@ -58,6 +58,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.QuaternaryOperator; +import org.apache.sysml.utils.IntUtils; public class QuaternarySPInstruction extends ComputationSPInstruction { private CPOperand _input4 = null; @@ -374,8 +375,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction { MatrixIndexes ixIn = arg._1(); MatrixBlock blkIn = arg._2(); MatrixBlock blkOut = new MatrixBlock(); - MatrixBlock mbU = _pmU.getBlock((int)ixIn.getRowIndex(), 1); - MatrixBlock mbV = _pmV.getBlock((int)ixIn.getColumnIndex(), 1); + MatrixBlock mbU = _pmU.getBlock(IntUtils.toInt(ixIn.getRowIndex()), 1); + MatrixBlock mbV = _pmV.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1); //execute core operation blkIn.quaternaryOperations(_qop, mbU, mbV, null, blkOut); @@ -402,8 +403,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction { MatrixBlock blkIn1 = arg0._2()._1(); MatrixBlock blkIn2 = arg0._2()._2(); MatrixBlock blkOut = new MatrixBlock(); - MatrixBlock mbU = (_pmU!=null)?_pmU.getBlock((int)ixIn.getRowIndex(), 1) : blkIn2; - MatrixBlock mbV = (_pmV!=null)?_pmV.getBlock((int)ixIn.getColumnIndex(), 1) : blkIn2; + MatrixBlock mbU = (_pmU!=null)?_pmU.getBlock(IntUtils.toInt(ixIn.getRowIndex()), 1) : blkIn2; + MatrixBlock mbV = (_pmV!=null)?_pmV.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1) : blkIn2; MatrixBlock mbW = (_qop.hasFourInputs()) ? blkIn2 : null; //execute core operation @@ -431,8 +432,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction { MatrixBlock blkIn2 = arg0._2()._1()._2(); MatrixBlock blkIn3 = arg0._2()._2(); MatrixBlock blkOut = new MatrixBlock(); - MatrixBlock mbU = (_pmU!=null)?_pmU.getBlock((int)ixIn.getRowIndex(), 1) : blkIn2; - MatrixBlock mbV = (_pmV!=null)?_pmV.getBlock((int)ixIn.getColumnIndex(), 1) : + MatrixBlock mbU = (_pmU!=null)?_pmU.getBlock(IntUtils.toInt(ixIn.getRowIndex()), 1) : blkIn2; + MatrixBlock mbV = (_pmV!=null)?_pmV.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1) : (_pmU!=null)? blkIn2 : blkIn3; MatrixBlock mbW = (_qop.hasFourInputs())? blkIn3 : null; http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java index 2b9adcb..8386e2c 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java @@ -65,6 +65,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.RandomMatrixGenerator; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; public class RandSPInstruction extends UnarySPInstruction { @@ -258,7 +259,7 @@ public class RandSPInstruction extends UnarySPInstruction { && ConfigurationManager.getExecutionMode() != RUNTIME_PLATFORM.SPARK ) { RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator( - pdf, (int)lrows, (int)lcols, rowsInBlock, colsInBlock, + pdf, IntUtils.toInt(lrows), IntUtils.toInt(lcols), rowsInBlock, colsInBlock, sparsity, minValue, maxValue, pdfParams); MatrixBlock mb = MatrixBlock.randOperations(rgen, lSeed); @@ -290,7 +291,7 @@ public class RandSPInstruction extends UnarySPInstruction { } //for load balancing: degree of parallelism such that ~128MB per partition - int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1); + int numPartitions = IntUtils.toInt( Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1)); //create seeds rdd seedsRDD = sec.getSparkContext().parallelizePairs(seeds, numPartitions); @@ -323,7 +324,7 @@ public class RandSPInstruction extends UnarySPInstruction { } //for load balancing: degree of parallelism such that ~128MB per partition - int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1); + int numPartitions = IntUtils.toInt( Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1)); //create seeds rdd seedsRDD = sec.getSparkContext() @@ -381,7 +382,7 @@ public class RandSPInstruction extends UnarySPInstruction { } //for load balancing: degree of parallelism such that ~128MB per partition - int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1); + int numPartitions = IntUtils.toInt( Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1)); //create offset rdd offsetsRDD = sec.getSparkContext().parallelize(offsets, numPartitions); @@ -408,7 +409,7 @@ public class RandSPInstruction extends UnarySPInstruction { } //for load balancing: degree of parallelism such that ~128MB per partition - int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1); + int numPartitions = IntUtils.toInt( Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1)); //create seeds rdd offsetsRDD = sec.getSparkContext() @@ -442,14 +443,14 @@ public class RandSPInstruction extends UnarySPInstruction { LOG.trace("Process RandSPInstruction sample with range="+ maxValue +", size="+ lrows +", replace="+ replace + ", seed=" + seed); // sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of the time. - double fraction = SamplingUtils.computeFractionForSampleSize((int)lrows, UtilFunctions.toLong(maxValue), replace); + double fraction = SamplingUtils.computeFractionForSampleSize(IntUtils.toInt(lrows), UtilFunctions.toLong(maxValue), replace); Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(seed); // divide the population range across numPartitions by creating SampleTasks double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize(); long outputSize = MatrixBlock.estimateSizeDenseInMemory(lrows,1); - int numPartitions = (int) Math.ceil((double)outputSize/hdfsBlockSize); + int numPartitions = IntUtils.toInt( Math.ceil((double)outputSize/hdfsBlockSize)); long partitionSize = (long) Math.ceil(maxValue/numPartitions); ArrayList<SampleTask> offsets = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java index 3e09d17..d725883 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java @@ -58,6 +58,7 @@ import org.apache.sysml.runtime.matrix.operators.ReorgOperator; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public class ReorgSPInstruction extends UnarySPInstruction { // sort-specific attributes (to enable variable attributes) @@ -169,7 +170,7 @@ public class ReorgSPInstruction extends UnarySPInstruction { // extract column (if necessary) and sort if( !singleCol ) out = out.filter(new IsBlockInRange(1, mcIn.getRows(), cols[0], cols[0], mcIn)) - .mapValues(new ExtractColumn((int)UtilFunctions.computeCellInBlock(cols[0], mcIn.getColsPerBlock()))); + .mapValues(new ExtractColumn(IntUtils.toInt(UtilFunctions.computeCellInBlock(cols[0], mcIn.getColsPerBlock())))); //actual index/data sort operation if( ixret ) //sort indexes @@ -268,7 +269,7 @@ public class ReorgSPInstruction extends UnarySPInstruction { ret.add(new Tuple2<>(ixOut,blkOut)); // insert newly created empty blocks for entire row - int numBlocks = (int) Math.ceil((double)_mcIn.getRows()/_mcIn.getRowsPerBlock()); + int numBlocks = IntUtils.toInt( Math.ceil((double)_mcIn.getRows()/_mcIn.getRowsPerBlock())); for(int i = 1; i <= numBlocks; i++) { if(i != ixOut.getColumnIndex()) { int lrlen = UtilFunctions.computeBlockSize(_mcIn.getRows(), rix, _mcIn.getRowsPerBlock()); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java index 90e5396..6be3e99 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java @@ -48,6 +48,7 @@ import org.apache.sysml.runtime.matrix.data.TripleIndexes; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.utils.IntUtils; public class RmmSPInstruction extends BinarySPInstruction { @@ -114,7 +115,7 @@ public class RmmSPInstruction extends BinarySPInstruction { * ((long) Math.ceil((double)mc2.getCols()/mc2.getColsPerBlock())); double matrix2PSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc2) * ((long) Math.ceil((double)mc1.getRows()/mc1.getRowsPerBlock())); - return (int) Math.max(Math.ceil((matrix1PSize+matrix2PSize)/hdfsBlockSize), 1); + return IntUtils.toInt( Math.max(Math.ceil((matrix1PSize+matrix2PSize)/hdfsBlockSize), 1)); } private static class RmmReplicateFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, TripleIndexes, MatrixBlock> http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java index 8f63427..dd5c062 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java @@ -65,6 +65,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -144,7 +145,7 @@ public class SpoofSPInstruction extends SPInstruction { long numBlocks = (op.getCellType()==CellType.ROW_AGG ) ? mcIn.getNumRowBlocks() : mcIn.getNumColBlocks(); out = RDDAggregateUtils.aggByKeyStable(out, aggop, - (int)Math.min(out.getNumPartitions(), numBlocks), false); + IntUtils.toInt(Math.min(out.getNumPartitions(), numBlocks)), false); } sec.setRDDHandleForVariable(_out.getName(), out); @@ -183,7 +184,7 @@ public class SpoofSPInstruction extends SPInstruction { if(type == OutProdType.LEFT_OUTER_PRODUCT || type == OutProdType.RIGHT_OUTER_PRODUCT ) { long numBlocks = mcOut.getNumRowBlocks() * mcOut.getNumColBlocks(); out = RDDAggregateUtils.sumByKeyStable(out, - (int)Math.min(out.getNumPartitions(), numBlocks), false); + IntUtils.toInt(Math.min(out.getNumPartitions(), numBlocks)), false); } sec.setRDDHandleForVariable(_out.getName(), out); @@ -206,7 +207,7 @@ public class SpoofSPInstruction extends SPInstruction { long clen2 = op.getRowType().isConstDim2(op.getConstDim2()) ? op.getConstDim2() : op.getRowType().isRowTypeB1() ? sec.getMatrixCharacteristics(_in[1].getName()).getCols() : -1; RowwiseFunction fmmc = new RowwiseFunction(_class.getName(), _classBytes, bcVect2, - bcMatrices, scalars, mcIn.getRowsPerBlock(), (int)mcIn.getCols(), (int)clen2); + bcMatrices, scalars, mcIn.getRowsPerBlock(), IntUtils.toInt(mcIn.getCols()), IntUtils.toInt(clen2)); out = in.mapPartitionsToPair(fmmc, op.getRowType()==RowType.ROW_AGG || op.getRowType() == RowType.NO_AGG); @@ -222,7 +223,7 @@ public class SpoofSPInstruction extends SPInstruction { { if( op.getRowType()==RowType.ROW_AGG && mcIn.getCols() > mcIn.getColsPerBlock() ) { out = RDDAggregateUtils.sumByKeyStable(out, - (int)Math.min(out.getNumPartitions(), mcIn.getNumRowBlocks()), false); + IntUtils.toInt(Math.min(out.getNumPartitions(), mcIn.getNumRowBlocks())), false); } sec.setRDDHandleForVariable(_out.getName(), out); @@ -263,8 +264,8 @@ public class SpoofSPInstruction extends SPInstruction { } private static boolean[] getMatrixBroadcastVector(SparkExecutionContext sec, CPOperand[] inputs, boolean[] bcVect) { - int numMtx = (int) Arrays.stream(inputs) - .filter(in -> in.getDataType().isMatrix()).count(); + int numMtx = IntUtils.toInt(Arrays.stream(inputs) + .filter(in -> in.getDataType().isMatrix()).count()); boolean[] ret = new boolean[numMtx]; for(int i=0, pos=0; i<inputs.length; i++) if( inputs[i].getDataType().isMatrix() ) @@ -380,9 +381,9 @@ public class SpoofSPInstruction extends SPInstruction { for( int i=0, posRdd=0, posBc=0; i<_bcInd.length; i++ ) { if( _bcInd[i] ) { PartitionedBroadcast<MatrixBlock> pb = _inputs.get(posBc++); - int rowIndex = (int) ((outer && i==2) ? ixIn.getColumnIndex() : + int rowIndex = IntUtils.toInt((outer && i==2) ? ixIn.getColumnIndex() : (pb.getNumRowBlocks()>=ixIn.getRowIndex())?ixIn.getRowIndex():1); - int colIndex = (int) ((outer && i==2) ? 1 : + int colIndex = IntUtils.toInt((outer && i==2) ? 1 : (pb.getNumColumnBlocks()>=ixIn.getColumnIndex())?ixIn.getColumnIndex():1); ret.add(pb.getBlock(rowIndex, colIndex)); } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java index 5bb686b..16ed0dd 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java @@ -50,6 +50,7 @@ import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -88,17 +89,17 @@ public class Tsmm2SPInstruction extends UnarySPInstruction { _type.isLeft() ? mc.getColsPerBlock()+1 : 1, mc.getCols(), mc)) .mapToPair(new ShiftTSMMIndexesFunction(_type)); PartitionedBlock<MatrixBlock> pmb = SparkExecutionContext.toPartitionedMatrixBlock(tmp1, - (int)(_type.isLeft() ? mc.getRows() : mc.getRows() - mc.getRowsPerBlock()), - (int)(_type.isLeft() ? mc.getCols()-mc.getColsPerBlock() : mc.getCols()), + IntUtils.toInt(_type.isLeft() ? mc.getRows() : mc.getRows() - mc.getRowsPerBlock()), + IntUtils.toInt(_type.isLeft() ? mc.getCols()-mc.getColsPerBlock() : mc.getCols()), mc.getRowsPerBlock(), mc.getColsPerBlock(), -1L); Broadcast<PartitionedBlock<MatrixBlock>> bpmb = sec.getSparkContext().broadcast(pmb); //step 2: second pass of X, compute tsmm/mapmm and aggregate result blocks - int outputDim = (int) (_type.isLeft() ? mc.getCols() : mc.getRows()); + int outputDim = IntUtils.toInt(_type.isLeft() ? mc.getCols() : mc.getRows()); if( OptimizerUtils.estimateSize(outputDim, outputDim) <= 32*1024*1024 ) { //default: <=32MB //output large blocks and reduceAll to avoid skew on combineByKey JavaRDD<MatrixBlock> tmp2 = in.map( - new RDDTSMM2ExtFunction(bpmb, _type, outputDim, (int)mc.getRowsPerBlock())); + new RDDTSMM2ExtFunction(bpmb, _type, outputDim, IntUtils.toInt(mc.getRowsPerBlock()))); MatrixBlock out = RDDAggregateUtils.sumStable(tmp2); //put output block into symbol table (no lineage because single block) @@ -150,8 +151,8 @@ public class Tsmm2SPInstruction extends UnarySPInstruction { if( _type.isLeft() ? ixin.getColumnIndex() == 1 : ixin.getRowIndex() == 1 ) { //execute block mapmm operation for full block only (output two blocks, due to symmetry) MatrixBlock mbin2 = _pb.getValue().getBlock( //lookup broadcast block - (int)(_type.isLeft()?ixin.getRowIndex():1), - (int)(_type.isLeft()?1:ixin.getColumnIndex())); + IntUtils.toInt(_type.isLeft()?ixin.getRowIndex():1), + IntUtils.toInt(_type.isLeft()?1:ixin.getColumnIndex())); MatrixBlock mbin2t = transpose(mbin2, new MatrixBlock()); //prep for transpose rewrite mm MatrixBlock out2 = (MatrixBlock) OperationsOnMatrixValues.matMult( //mm @@ -205,25 +206,25 @@ public class Tsmm2SPInstruction extends UnarySPInstruction { //execute block tsmm operation MatrixBlock out1 = mbin.transposeSelfMatrixMultOperations(new MatrixBlock(), _type); - int ix = (int) ((_type.isLeft() ? ixin.getColumnIndex() : ixin.getRowIndex())-1) * _blen; + int ix = IntUtils.toInt((_type.isLeft() ? ixin.getColumnIndex() : ixin.getRowIndex())-1) * _blen; out.copy(ix, ix+out1.getNumRows()-1, ix, ix+out1.getNumColumns()-1, out1, true); if( fullBlock ) { //execute block mapmm operation for full block only (output two blocks, due to symmetry) MatrixBlock mbin2 = _pb.getValue().getBlock( //lookup broadcast block - (int)(_type.isLeft()?ixin.getRowIndex():1), - (int)(_type.isLeft()?1:ixin.getColumnIndex())); + IntUtils.toInt(_type.isLeft()?ixin.getRowIndex():1), + IntUtils.toInt(_type.isLeft()?1:ixin.getColumnIndex())); MatrixBlock mbin2t = transpose(mbin2, new MatrixBlock()); //prep for transpose rewrite mm MatrixBlock out2 = OperationsOnMatrixValues.matMult( //mm _type.isLeft() ? mbin2t : mbin, _type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op); MatrixIndexes ixout2 = _type.isLeft() ? new MatrixIndexes(2,1) : new MatrixIndexes(1,2); - out.copy((int)(ixout2.getRowIndex()-1)*_blen, (int)(ixout2.getRowIndex()-1)*_blen+out2.getNumRows()-1, - (int)(ixout2.getColumnIndex()-1)*_blen, (int)(ixout2.getColumnIndex()-1)*_blen+out2.getNumColumns()-1, out2, true); + out.copy(IntUtils.toInt(ixout2.getRowIndex()-1)*_blen, IntUtils.toInt(ixout2.getRowIndex()-1)*_blen+out2.getNumRows()-1, + IntUtils.toInt(ixout2.getColumnIndex()-1)*_blen, IntUtils.toInt(ixout2.getColumnIndex()-1)*_blen+out2.getNumColumns()-1, out2, true); MatrixBlock out3 = transpose(out2, new MatrixBlock()); - out.copy((int)(ixout2.getColumnIndex()-1)*_blen, (int)(ixout2.getColumnIndex()-1)*_blen+out3.getNumRows()-1, - (int)(ixout2.getRowIndex()-1)*_blen, (int)(ixout2.getRowIndex()-1)*_blen+out3.getNumColumns()-1, out3, true); + out.copy(IntUtils.toInt(ixout2.getColumnIndex()-1)*_blen, IntUtils.toInt(ixout2.getColumnIndex()-1)*_blen+out3.getNumRows()-1, + IntUtils.toInt(ixout2.getRowIndex()-1)*_blen, IntUtils.toInt(ixout2.getRowIndex()-1)*_blen+out3.getNumColumns()-1, out3, true); } return out; http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java index 677fbbf..4428e2d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java @@ -25,6 +25,7 @@ import org.apache.spark.Partitioner; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.utils.IntUtils; /** * Default partitioner used for all binary block rdd operations in order @@ -75,7 +76,7 @@ public class BlockPartitioner extends Partitioner } //compute meta data for runtime - _ncparts = (int)Math.ceil((double)ncblks/_cbPerPart); + _ncparts = IntUtils.toInt(Math.ceil((double)ncblks/_cbPerPart)); _numParts = numParts; } @@ -90,8 +91,8 @@ public class BlockPartitioner extends Partitioner //get partition id MatrixIndexes ix = (MatrixIndexes) arg0; - int ixr = (int)((ix.getRowIndex()-1)/_rbPerPart); - int ixc = (int)((ix.getColumnIndex()-1)/_cbPerPart); + int ixr = IntUtils.toInt((ix.getRowIndex()-1)/_rbPerPart); + int ixc = IntUtils.toInt((ix.getColumnIndex()-1)/_cbPerPart); int id = ixr * _ncparts + ixc; //ensure valid range http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java index 8a4999b..62501e7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java @@ -34,6 +34,7 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory; import org.apache.sysml.runtime.util.FastBufferedDataInputStream; import org.apache.sysml.runtime.util.FastBufferedDataOutputStream; +import org.apache.sysml.utils.IntUtils; /** * This class is for partitioned matrix/frame blocks, to be used as broadcasts. @@ -132,11 +133,11 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable } public int getNumRowBlocks() { - return (int)Math.ceil((double)_rlen/_brlen); + return IntUtils.toInt(Math.ceil((double)_rlen/_brlen)); } public int getNumColumnBlocks() { - return (int)Math.ceil((double)_clen/_bclen); + return IntUtils.toInt(Math.ceil((double)_clen/_bclen)); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java index ab29c4c..6eec144 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java @@ -30,6 +30,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.util.IndexRange; +import org.apache.sysml.utils.IntUtils; /** * This class is a wrapper around an array of broadcasts of partitioned matrix/frame blocks, @@ -71,11 +72,11 @@ public class PartitionedBroadcast<T extends CacheBlock> implements Serializable } public int getNumRowBlocks() { - return (int)_mc.getNumRowBlocks(); + return IntUtils.toInt(_mc.getNumRowBlocks()); } public int getNumColumnBlocks() { - return (int)_mc.getNumColBlocks(); + return IntUtils.toInt(_mc.getNumColBlocks()); } public static int computeBlocksPerPartition(long rlen, long clen, long brlen, long bclen) { http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java index 0ce654b..a2414bc 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java @@ -31,6 +31,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public class ExtractBlockForBinaryReblock implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> { @@ -96,8 +97,8 @@ public class ExtractBlockForBinaryReblock implements PairFlatMapFunction<Tuple2< blk.setNonZeros(in.getNonZeros()); } else { //general case - for(int i2 = 0; i2 <= (int)(rowUpper-rowLower); i2++) - for(int j2 = 0; j2 <= (int)(colUpper-colLower); j2++) + for(int i2 = 0; i2 <= IntUtils.toInt(rowUpper-rowLower); i2++) + for(int j2 = 0; j2 <= IntUtils.toInt(colUpper-colLower); j2++) blk.appendValue(cixi+i2, cixj+j2, in.quickGetValue(aixi+i2, aixj+j2)); } retVal.add(new Tuple2<>(indx, blk)); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java index 3e6908b..b05c0bf 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java @@ -35,6 +35,7 @@ import org.apache.sysml.runtime.matrix.data.WeightedCell; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public abstract class ExtractGroup implements Serializable { @@ -65,7 +66,7 @@ public abstract class ExtractGroup implements Serializable if(_op instanceof AggregateOperator && _ngroups > 0 && OptimizerUtils.isValidCPDimensions(_ngroups, target.getNumColumns()) ) { - MatrixBlock tmp = group.groupedAggOperations(target, null, new MatrixBlock(), (int)_ngroups, _op); + MatrixBlock tmp = group.groupedAggOperations(target, null, new MatrixBlock(), IntUtils.toInt(_ngroups), _op); for(int i=0; i<tmp.getNumRows(); i++) { for( int j=0; j<tmp.getNumColumns(); j++ ) { @@ -139,7 +140,7 @@ public abstract class ExtractGroup implements Serializable throws Exception { MatrixIndexes ix = arg._1; - MatrixBlock group = _pbm.getBlock((int)ix.getRowIndex(), 1); + MatrixBlock group = _pbm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1); MatrixBlock target = arg._2; return execute(ix, group, target).iterator(); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java index afd2e1a..84d161a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java @@ -31,6 +31,7 @@ import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; +import org.apache.sysml.utils.IntUtils; public class MatrixVectorBinaryOpPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes,MatrixBlock> { @@ -74,8 +75,8 @@ public class MatrixVectorBinaryOpPartitionFunction implements PairFlatMapFunctio MatrixBlock in1 = arg._2(); //get the rhs block - int rix= (int)((_vtype==VectorType.COL_VECTOR) ? ix.getRowIndex() : 1); - int cix= (int)((_vtype==VectorType.COL_VECTOR) ? 1 : ix.getColumnIndex()); + int rix= IntUtils.toInt((_vtype==VectorType.COL_VECTOR) ? ix.getRowIndex() : 1); + int cix= IntUtils.toInt((_vtype==VectorType.COL_VECTOR) ? 1 : ix.getColumnIndex()); MatrixBlock in2 = _pmV.getBlock(rix, cix); //execute the binary operation http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index c0fc34a..69be559 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -67,6 +67,7 @@ import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -101,7 +102,7 @@ public class FrameRDDConverterUtils //prepare default schema if needed if( schema == null || schema.length==1 ) - schema = UtilFunctions.nCopies((int)mc.getCols(), ValueType.STRING); + schema = UtilFunctions.nCopies(IntUtils.toInt(mc.getCols()), ValueType.STRING); //convert csv rdd to binary block rdd (w/ partial blocks) JavaPairRDD<Long, FrameBlock> out = prepinput.mapPartitionsToPair( @@ -153,7 +154,7 @@ public class FrameRDDConverterUtils JavaPairRDD<Long, Text> input, MatrixCharacteristics mc, ValueType[] schema ) { //prepare default schema if needed if( schema == null || schema.length==1 ) { - schema = UtilFunctions.nCopies((int)mc.getCols(), + schema = UtilFunctions.nCopies(IntUtils.toInt(mc.getCols()), (schema!=null) ? schema[0] : ValueType.STRING); } @@ -189,7 +190,7 @@ public class FrameRDDConverterUtils if(mcIn.getCols() > mcIn.getColsPerBlock()) { //split matrix blocks into extended matrix blocks in = in.flatMapToPair(new MatrixFrameReblockFunction(mcIn)); - mc.setBlockSize(MatrixFrameReblockFunction.computeBlockSize(mc), (int)mc.getCols()); + mc.setBlockSize(MatrixFrameReblockFunction.computeBlockSize(mc), IntUtils.toInt(mc.getCols())); //shuffle matrix blocks (instead of frame blocks) in order to exploit //sparse formats (for sparse or wide matrices) during shuffle @@ -238,8 +239,8 @@ public class FrameRDDConverterUtils df.javaRDD().zipWithIndex(); //zip row index //convert data frame to frame schema (prepare once) - String[] colnames = new String[(int)mc.getCols()]; - ValueType[] fschema = new ValueType[(int)mc.getCols()]; + String[] colnames = new String[IntUtils.toInt(mc.getCols())]; + ValueType[] fschema = new ValueType[IntUtils.toInt(mc.getCols())]; int colVect = convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID); out.set(colnames, fschema); //make schema available @@ -260,7 +261,7 @@ public class FrameRDDConverterUtils //create data frame schema if( schema == null ) - schema = UtilFunctions.nCopies((int)mc.getCols(), ValueType.STRING); + schema = UtilFunctions.nCopies(IntUtils.toInt(mc.getCols()), ValueType.STRING); StructType dfSchema = convertFrameSchemaToDFSchema(schema, true); //rdd to data frame conversion @@ -552,7 +553,7 @@ public class FrameRDDConverterUtils _schema = schema; _hasHeader = hasHeader; _delim = delim; - _maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1); + _maxRowsPerBlock = Math.max(IntUtils.toInt(FrameBlock.BUFFER_SIZE/_clen), 1); } @Override @@ -563,7 +564,7 @@ public class FrameRDDConverterUtils long ix = -1; FrameBlock fb = null; - String[] tmprow = new String[(int)_clen]; + String[] tmprow = new String[IntUtils.toInt(_clen)]; while( arg0.hasNext() ) { @@ -575,11 +576,11 @@ public class FrameRDDConverterUtils continue; } if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) { - _mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1)); + _mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, IntUtils.toInt(_clen+1))); continue; } else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) { - _ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1)); + _ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, IntUtils.toInt(_clen+1))); continue; } @@ -716,7 +717,7 @@ public class FrameRDDConverterUtils _schema = schema; _containsID = containsID; _colVect = colVect; - _maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1); + _maxRowsPerBlock = Math.max(IntUtils.toInt(FrameBlock.BUFFER_SIZE/_clen), 1); } @Override @@ -727,7 +728,7 @@ public class FrameRDDConverterUtils long ix = -1; FrameBlock fb = null; - Object[] tmprow = new Object[(int)_clen]; + Object[] tmprow = new Object[IntUtils.toInt(_clen)]; while( arg0.hasNext() ) { @@ -815,7 +816,7 @@ public class FrameRDDConverterUtils _clen = mc.getCols(); //determine upper bounded buffer len - _bufflen = (int) Math.min(_rlen*_clen, FrameBlock.BUFFER_SIZE); + _bufflen = IntUtils.toInt(Math.min(_rlen*_clen, FrameBlock.BUFFER_SIZE)); } protected void flushBufferToList( FrameReblockBuffer rbuff, ArrayList<Tuple2<Long,FrameBlock>> ret ) @@ -857,7 +858,7 @@ public class FrameRDDConverterUtils st.reset( strVal ); long row = st.nextLong(); long col = st.nextLong(); - Object val = UtilFunctions.stringToObject(_schema[(int)col-1], st.nextToken()); + Object val = UtilFunctions.stringToObject(_schema[IntUtils.toInt(col-1)], st.nextToken()); //flush buffer if necessary if( rbuff.getSize() >= rbuff.getCapacity() ) @@ -908,7 +909,7 @@ public class FrameRDDConverterUtils long rowix = (ix.getRowIndex()-1)*_brlen+1; //global index within frame block (0-based) - long cl = (int)((ix.getColumnIndex()-1)*_bclen); + long cl = IntUtils.toInt((ix.getColumnIndex()-1)*_bclen); long cu = Math.min(cl+mb.getNumColumns()-1, _clen); //prepare output frame blocks @@ -916,8 +917,8 @@ public class FrameRDDConverterUtils int ru = Math.min(i+_maxRowsPerBlock, mb.getNumRows())-1; long rix = UtilFunctions.computeBlockIndex(rowix+i, _maxRowsPerBlock); MatrixIndexes ixout = new MatrixIndexes(rix, 1); - MatrixBlock out = new MatrixBlock(ru-i+1, (int)_clen, sparse); - out.copy(0, out.getNumRows()-1, (int)cl, (int)cu, + MatrixBlock out = new MatrixBlock(ru-i+1, IntUtils.toInt(_clen), sparse); + out.copy(0, out.getNumRows()-1, IntUtils.toInt(cl), IntUtils.toInt(cu), mb.slice(i, ru, 0, mb.getNumColumns()-1, mbreuse), true); out.examSparsity(); ret.add(new Tuple2<>(ixout,out)); @@ -935,8 +936,8 @@ public class FrameRDDConverterUtils */ public static int computeBlockSize(MatrixCharacteristics mc) { int brlen = mc.getRowsPerBlock(); - int basic = Math.max((int)(FrameBlock.BUFFER_SIZE/mc.getCols()), 1); - int div = (int)Math.ceil((double)brlen/basic); + int basic = Math.max(IntUtils.toInt(FrameBlock.BUFFER_SIZE/mc.getCols()), 1); + int div = IntUtils.toInt(Math.ceil((double)brlen/basic)); while( brlen % div != 0 ) div++; return brlen / div; @@ -995,8 +996,8 @@ public class FrameRDDConverterUtils for( long rix=rstartix; rix<=rendix; rix++ ) { //for all row blocks long rpos = UtilFunctions.computeCellIndex(rix, brlen, 0); int lrlen = UtilFunctions.computeBlockSize(rlen, rix, brlen); - int fix = (int)((rpos-rowIndex>=0) ? rpos-rowIndex : 0); - int fix2 = (int)Math.min(rpos+lrlen-rowIndex-1,blk.getNumRows()-1); + int fix = IntUtils.toInt((rpos-rowIndex>=0) ? rpos-rowIndex : 0); + int fix2 = IntUtils.toInt(Math.min(rpos+lrlen-rowIndex-1,blk.getNumRows()-1)); int mix = UtilFunctions.computeCellInBlock(rowIndex+fix, brlen); int mix2 = mix + (fix2-fix); for( long cix=1; cix<=cendix; cix++ ) { //for all column blocks @@ -1004,7 +1005,7 @@ public class FrameRDDConverterUtils int lclen = UtilFunctions.computeBlockSize(clen, cix, bclen); MatrixBlock matrix = new MatrixBlock(lrlen, lclen, false); FrameBlock frame = blk.slice(fix, fix2, - (int)cpos-1, (int)cpos+lclen-2, new FrameBlock()); + IntUtils.toInt(cpos-1), IntUtils.toInt(cpos+lclen-2), new FrameBlock()); MatrixBlock mframe = DataConverter.convertToMatrixBlock(frame); ret.add(new Tuple2<>(new MatrixIndexes(rix, cix), matrix.leftIndexingOperations(mframe, mix, mix2, 0, lclen-1,