http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
index f25899f..2b6575a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
@@ -40,6 +40,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
+import org.apache.sysml.utils.IntUtils;
 
 public class CentralMomentSPInstruction extends UnarySPInstruction {
 
@@ -104,7 +105,7 @@ public class CentralMomentSPInstruction extends 
UnarySPInstruction {
                ScalarObject order = ec.getScalarInput(scalarInput.getName(), 
scalarInput.getValueType(), scalarInput.isLiteral()); 
                CMOperator cop = ((CMOperator)_optr); 
                if ( cop.getAggOpType() == AggregateOperationTypes.INVALID ) {
-                       cop.setCMAggOp((int)order.getLongValue());
+                       cop.setCMAggOp(IntUtils.toInt(order.getLongValue()));
                }
                
                //get input

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
index 308e60f..d83a6ed 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
@@ -49,6 +49,7 @@ import 
org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysml.utils.IntUtils;
 
 /**
  * Cpmm: cross-product matrix multiplication operation (distributed matrix 
multiply
@@ -165,9 +166,9 @@ public class CpmmSPInstruction extends BinarySPInstruction {
        }
        
        private static int getMaxParJoin(MatrixCharacteristics mc1, 
MatrixCharacteristics mc2) {
-               return mc1.colsKnown() ? (int)mc1.getNumColBlocks() :
-                       mc2.rowsKnown() ? (int)mc2.getNumRowBlocks() :
-                       Integer.MAX_VALUE;
+               return IntUtils.toInt(mc1.colsKnown() ? mc1.getNumColBlocks() :
+                       mc2.rowsKnown() ? mc2.getNumRowBlocks() :
+                       Integer.MAX_VALUE);
        }
 
        private static class CpmmIndexFunction implements 
PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, Long, IndexedMatrixValue>

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
index 68cc6db..a0dfb85 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
@@ -39,6 +39,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysml.utils.IntUtils;
 
 public class CumulativeAggregateSPInstruction extends 
AggregateUnarySPInstruction {
 
@@ -76,7 +77,7 @@ public class CumulativeAggregateSPInstruction extends 
AggregateUnarySPInstructio
                //merge partial aggregates, adjusting for correct number of 
partitions
                //as size can significant shrink (1K) but also grow 
(sparse-dense)
                int numParts = SparkUtils.getNumPreferredPartitions(mcOut);
-               int minPar = 
(int)Math.min(SparkExecutionContext.getDefaultParallelism(true), 
mcOut.getNumBlocks());
+               int minPar = 
IntUtils.toInt(Math.min(SparkExecutionContext.getDefaultParallelism(true), 
mcOut.getNumBlocks()));
                out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, 
minPar), false);
                
                //put output handle in symbol table
@@ -133,9 +134,9 @@ public class CumulativeAggregateSPInstruction extends 
AggregateUnarySPInstructio
                        //cumsum expand partial aggregates
                        long rlenOut = (long)Math.ceil((double)_rlen/_brlen);
                        long rixOut = 
(long)Math.ceil((double)ixIn.getRowIndex()/_brlen);
-                       int rlenBlk = (int) Math.min(rlenOut-(rixOut-1)*_brlen, 
_brlen);
+                       int rlenBlk = IntUtils.toInt( 
Math.min(rlenOut-(rixOut-1)*_brlen, _brlen));
                        int clenBlk = blkOut.getNumColumns();
-                       int posBlk = (int) ((ixIn.getRowIndex()-1) % _brlen);
+                       int posBlk = IntUtils.toInt((ixIn.getRowIndex()-1) % 
_brlen);
                        MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, 
false);
                        blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, 
true);
                        ixOut.setIndexes(rixOut, ixOut.getColumnIndex());

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
index c32a57b..952a6d0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
@@ -45,6 +45,7 @@ import 
org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
        private BinaryOperator _bop = null;
@@ -209,7 +210,7 @@ public class CumulativeOffsetSPInstruction extends 
BinarySPInstruction {
                        
                        //lookup offset row and return joined output
                        MatrixBlock off = (ixIn.getRowIndex() == 1) ? new 
MatrixBlock(1, blkIn.getNumColumns(), _initValue) :
-                               _pbc.getBlock((int)brix, 
(int)ixIn.getColumnIndex()).slice(rix, rix);
+                               _pbc.getBlock(IntUtils.toInt(brix), 
IntUtils.toInt(ixIn.getColumnIndex())).slice(rix, rix);
                        return new Tuple2<MatrixIndexes, 
Tuple2<MatrixBlock,MatrixBlock>>(ixIn, new Tuple2<>(blkIn,off));
                }
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java
index fbb214e..e734fb9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/DnnSPInstruction.java
@@ -47,6 +47,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysml.runtime.util.DnnUtils;
+import org.apache.sysml.utils.IntUtils;
 import org.apache.sysml.utils.NativeHelper;
 
 import scala.Tuple2;
@@ -214,7 +215,7 @@ public class DnnSPInstruction extends UnarySPInstruction {
                MatrixCharacteristics mcRdd = 
sec.getMatrixCharacteristics(name);
                if(mcRdd.getColsPerBlock() < mcRdd.getCols() || 
mcRdd.getRowsPerBlock() != 1) {
                        MatrixCharacteristics mcOut = new 
MatrixCharacteristics(mcRdd);
-                       mcOut.setColsPerBlock((int)mcRdd.getCols());
+                       mcOut.setColsPerBlock(IntUtils.toInt(mcRdd.getCols()));
                        mcOut.setRowsPerBlock(numRowsPerBlock); 
                        in1 = 
RDDAggregateUtils.mergeByKey(in1.flatMapToPair(new 
ExtractBlockForBinaryReblock(mcRdd, mcOut)));
                        // TODO: Inject checkpoint to avoid doing this repeated 
for validation set
@@ -266,8 +267,8 @@ public class DnnSPInstruction extends UnarySPInstruction {
                        int K = getScalarInput(ec, _filter_shape, 0);
                        int R = getScalarInput(ec, _filter_shape, 2);
                        int S = getScalarInput(ec, _filter_shape, 3);
-                       int P = (int) DnnUtils.getP(H, R, stride_h, pad_h);
-                       int Q = (int) DnnUtils.getQ(W, S, stride_w, pad_w);
+                       int P = IntUtils.toInt(DnnUtils.getP(H, R, stride_h, 
pad_h));
+                       int Q = IntUtils.toInt(DnnUtils.getQ(W, S, stride_w, 
pad_w));
                        
                        DnnParameters params = new 
DnnParameters(numRowsPerBlock, C, H, W, K, R, S, stride_h, stride_w, pad_h, 
pad_w, 1);
                        boolean enableNativeBLAS = 
NativeHelper.isNativeLibraryLoaded(); 
@@ -286,7 +287,7 @@ public class DnnSPInstruction extends UnarySPInstruction {
                                throw new DMLRuntimeException("The current 
operator doesnot support large outputs.");
                        }
                        sec.setMetaData(output.getName(), 
-                                       new MetaDataFormat(new 
MatrixCharacteristics(mcRdd.getRows(), numCols, numRowsPerBlock, (int)numCols, 
nnz), OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
+                                       new MetaDataFormat(new 
MatrixCharacteristics(mcRdd.getRows(), numCols, numRowsPerBlock, 
IntUtils.toInt(numCols), nnz), OutputInfo.BinaryBlockOutputInfo, 
InputInfo.BinaryBlockInputInfo));
                }
                else {
                        throw new DMLRuntimeException("Not implemented: " + 
instOpcode);
@@ -294,8 +295,8 @@ public class DnnSPInstruction extends UnarySPInstruction {
        }
 
        private static int getScalarInput(ExecutionContext ec, 
ArrayList<CPOperand> aL, int index) {
-               return (int) ec.getScalarInput(aL.get(index).getName(),
-                       aL.get(index).getValueType(), 
aL.get(index).isLiteral()).getLongValue();
+               return IntUtils.toInt( 
ec.getScalarInput(aL.get(index).getName(),
+                       aL.get(index).getValueType(), 
aL.get(index).isLiteral()).getLongValue());
        }
        
        private static class RDDConv2dMapMMFunction implements 
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock>>, 
MatrixIndexes, MatrixBlock> {

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index f74f293..64567d1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -47,6 +47,7 @@ import 
org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.util.IndexRange;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 /**
  * This class implements the frame indexing functionality inside Spark.
@@ -103,7 +104,7 @@ public class FrameIndexingSPInstruction extends 
IndexingSPInstruction {
                        
                        //update schema of output with subset of input schema
                        sec.getFrameObject(output.getName()).setSchema(
-                               
sec.getFrameObject(input1.getName()).getSchema((int)cl, (int)cu));
+                               
sec.getFrameObject(input1.getName()).getSchema(IntUtils.toInt(cl), 
IntUtils.toInt(cu)));
                }
                //left indexing
                else if ( opcode.equalsIgnoreCase(LeftIndex.OPCODE) || 
opcode.equalsIgnoreCase("mapLeftIndex"))
@@ -186,8 +187,8 @@ public class FrameIndexingSPInstruction extends 
IndexingSPInstruction {
                        _ixrange = ixrange;
                        _rlen = mcLeft.getRows();
                        _clen = mcLeft.getCols();
-                       _brlen = (int) 
Math.min(OptimizerUtils.getDefaultFrameSize(), _rlen);
-                       _bclen = (int) mcLeft.getCols();
+                       _brlen = IntUtils.toInt( 
Math.min(OptimizerUtils.getDefaultFrameSize(), _rlen));
+                       _bclen = IntUtils.toInt(mcLeft.getCols());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
index c9566a2..f5321f2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
@@ -38,6 +38,7 @@ import 
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.utils.IntUtils;
 
 public class MapmmChainSPInstruction extends SPInstruction {
        private ChainType _chainType = null;
@@ -168,7 +169,7 @@ public class MapmmChainSPInstruction extends SPInstruction {
                        MatrixBlock pmV = _pmV.getBlock(1, 1);
                        MatrixIndexes ixIn = arg0._1();
                        MatrixBlock blkIn = arg0._2();
-                       int rowIx = (int)ixIn.getRowIndex();
+                       int rowIx = IntUtils.toInt(ixIn.getRowIndex());
                        //execute mapmmchain operation
                        return blkIn.chainMatrixMultOperations(pmV, 
                                        _pmW.getBlock(rowIx,1), new 
MatrixBlock(), _chainType);

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 7c8d606..4f6fd8d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -54,6 +54,7 @@ import 
org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.utils.IntUtils;
 
 public class MapmmSPInstruction extends BinarySPInstruction {
        private CacheType _type = null;
@@ -238,7 +239,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                isLeft?mcRdd.getCols():mcBc.getCols(), 
isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(),
                                
isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0)); 
                long numParts = sizeOutput / 
InfrastructureAnalyzer.getHDFSBlockSize();
-               return (int)Math.min(numParts, 
(isLeft?mcRdd.getNumColBlocks():mcRdd.getNumRowBlocks()));
+               return IntUtils.toInt(Math.min(numParts, 
(isLeft?mcRdd.getNumColBlocks():mcRdd.getNumRowBlocks())));
        }
 
        private static class RDDMapMMFunction implements 
PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 
@@ -272,7 +273,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                        if( _type == CacheType.LEFT )
                        {
                                //get the right hand side matrix
-                               MatrixBlock left = _pbc.getBlock(1, 
(int)ixIn.getRowIndex());
+                               MatrixBlock left = _pbc.getBlock(1, 
IntUtils.toInt(ixIn.getRowIndex()));
                                
                                //execute matrix-vector mult
                                OperationsOnMatrixValues.matMult(new 
MatrixIndexes(1,ixIn.getRowIndex()),
@@ -281,7 +282,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                        else //if( _type == CacheType.RIGHT )
                        {
                                //get the right hand side matrix
-                               MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), 1);
+                               MatrixBlock right = 
_pbc.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1);
                                
                                //execute matrix-vector mult
                                OperationsOnMatrixValues.matMult(ixIn, blkIn,
@@ -324,7 +325,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                        if( _type == CacheType.LEFT )
                        {
                                //get the right hand side matrix
-                               MatrixBlock left = _pbc.getBlock(1, 
(int)ixIn.getRowIndex());
+                               MatrixBlock left = _pbc.getBlock(1, 
IntUtils.toInt(ixIn.getRowIndex()));
                                
                                //execute matrix-vector mult
                                return OperationsOnMatrixValues.matMult( 
@@ -333,7 +334,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                        else //if( _type == CacheType.RIGHT )
                        {
                                //get the right hand side matrix
-                               MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), 1);
+                               MatrixBlock right = 
_pbc.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1);
                                
                                //execute matrix-vector mult
                                return OperationsOnMatrixValues.matMult(
@@ -389,7 +390,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                if( _type == CacheType.LEFT )
                                {
                                        //get the right hand side matrix
-                                       MatrixBlock left = _pbc.getBlock(1, 
(int)ixIn.getRowIndex());
+                                       MatrixBlock left = _pbc.getBlock(1, 
IntUtils.toInt(ixIn.getRowIndex()));
                                        
                                        //execute index preserving matrix 
multiplication
                                        OperationsOnMatrixValues.matMult(left, 
blkIn, blkOut, _op);
@@ -397,7 +398,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                else //if( _type == CacheType.RIGHT )
                                {
                                        //get the right hand side matrix
-                                       MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), 1);
+                                       MatrixBlock right = 
_pbc.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1);
 
                                        //execute index preserving matrix 
multiplication
                                        OperationsOnMatrixValues.matMult(blkIn, 
right, blkOut, _op);
@@ -437,14 +438,14 @@ public class MapmmSPInstruction extends 
BinarySPInstruction {
                                //for all matching left-hand-side blocks, 
returned as lazy iterator
                                return IntStream.range(1, 
_pbc.getNumRowBlocks()+1).mapToObj(i ->
                                        new Tuple2<>(new MatrixIndexes(i, 
ixIn.getColumnIndex()),
-                                       
OperationsOnMatrixValues.matMult(_pbc.getBlock(i, (int)ixIn.getRowIndex()), 
blkIn,
+                                       
OperationsOnMatrixValues.matMult(_pbc.getBlock(i, 
IntUtils.toInt(ixIn.getRowIndex())), blkIn,
                                                new MatrixBlock(), 
_op))).iterator();
                        }
                        else { //RIGHT
                                //for all matching right-hand-side blocks, 
returned as lazy iterator
                                return IntStream.range(1, 
_pbc.getNumColumnBlocks()+1).mapToObj(j ->
                                        new Tuple2<>(new 
MatrixIndexes(ixIn.getRowIndex(), j),
-                                       OperationsOnMatrixValues.matMult(blkIn, 
_pbc.getBlock((int)ixIn.getColumnIndex(), j),
+                                       OperationsOnMatrixValues.matMult(blkIn, 
_pbc.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), j),
                                                new MatrixBlock(), 
_op))).iterator();
                        }
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
index 3019a78..da1b975 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
@@ -39,6 +39,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.utils.IntUtils;
 
 public class MatrixAppendMSPInstruction extends AppendMSPInstruction {
 
@@ -138,11 +139,11 @@ public class MatrixAppendMSPInstruction extends 
AppendMSPInstruction {
                                //output shallow copy of rhs block
                                if( _cbind ) {
                                        ret.add( new Tuple2<>(new 
MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
-                                               
_pm.getBlock((int)ix.getRowIndex(), 1)) );
+                                               
_pm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1)) );
                                }
                                else { //rbind
                                        ret.add( new Tuple2<>(new 
MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
-                                               _pm.getBlock(1, 
(int)ix.getColumnIndex())) );
+                                               _pm.getBlock(1, 
IntUtils.toInt(ix.getColumnIndex()))) );
                                }
                        }
                        //case 3: append operation on boundary block
@@ -155,7 +156,7 @@ public class MatrixAppendMSPInstruction extends 
AppendMSPInstruction {
                                
                                MatrixBlock value_in2 = null;
                                if( _cbind ) {
-                                       value_in2 = 
_pm.getBlock((int)ix.getRowIndex(), 1);
+                                       value_in2 = 
_pm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1);
                                        
if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
                                                IndexedMatrixValue second=new 
IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
                                                
second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
@@ -163,7 +164,7 @@ public class MatrixAppendMSPInstruction extends 
AppendMSPInstruction {
                                        }
                                }
                                else { //rbind
-                                       value_in2 = _pm.getBlock(1, 
(int)ix.getColumnIndex());
+                                       value_in2 = _pm.getBlock(1, 
IntUtils.toInt(ix.getColumnIndex()));
                                        
if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
                                                IndexedMatrixValue second=new 
IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
                                                
second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
@@ -228,8 +229,8 @@ public class MatrixAppendMSPInstruction extends 
AppendMSPInstruction {
                                }
                                //case 3: append operation on boundary block
                                else {
-                                       int rowix = _cbind ? 
(int)ix.getRowIndex() : 1;
-                                       int colix = _cbind ? 1 : 
(int)ix.getColumnIndex();
+                                       int rowix = _cbind ? 
IntUtils.toInt(ix.getRowIndex()) : 1;
+                                       int colix = _cbind ? 1 : 
IntUtils.toInt(ix.getColumnIndex());
                                        MatrixBlock in2 = _pm.getBlock(rowix, 
colix);
                                        MatrixBlock out = in1.append(in2, new 
MatrixBlock(), _cbind);
                                        return new Tuple2<>(ix, out);

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index d30f330..2c4a01e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -58,6 +58,7 @@ import 
org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.util.IndexRange;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 /**
  * This class implements the matrix indexing functionality inside CP.
@@ -203,8 +204,8 @@ public class MatrixIndexingSPInstruction extends 
IndexingSPInstruction {
                                 .mapToPair(new SliceBlock2(ixrange, mcOut));   
    //slice relevant blocks
                
                //collect output without shuffle to avoid side-effects with 
custom PartitionPruningRDD
-               MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, 
(int)mcOut.getRows(), 
-                               (int)mcOut.getCols(), mcOut.getRowsPerBlock(), 
mcOut.getColsPerBlock(), -1);
+               MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, 
IntUtils.toInt(mcOut.getRows()), 
+                               IntUtils.toInt(mcOut.getCols()), 
mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
                return mbout;
        }
        
@@ -410,11 +411,11 @@ public class MatrixIndexingSPInstruction extends 
IndexingSPInstruction {
                                        MatrixBlock right = arg._2();
 
                                        int rl = 
UtilFunctions.computeCellInBlock(_ixrange.rowStart, _brlen);
-                                       int ru = (int)Math.min(_ixrange.rowEnd, 
rl+right.getNumRows())-1;
+                                       int ru = 
IntUtils.toInt(Math.min(_ixrange.rowEnd, rl+right.getNumRows())-1);
                                        int cl = 
UtilFunctions.computeCellInBlock(_ixrange.colStart, _brlen);
-                                       int cu = (int)Math.min(_ixrange.colEnd, 
cl+right.getNumColumns())-1;
+                                       int cu = 
IntUtils.toInt(Math.min(_ixrange.colEnd, cl+right.getNumColumns())-1);
                                        
-                                       MatrixBlock left = 
_binput.getBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex());
+                                       MatrixBlock left = 
_binput.getBlock(IntUtils.toInt(ix.getRowIndex()), 
IntUtils.toInt(ix.getColumnIndex()));
                                        MatrixBlock tmp = 
left.leftIndexingOperations(right, 
                                                        rl, ru, cl, cu, new 
MatrixBlock(), UpdateType.COPY);
                                        
@@ -474,10 +475,10 @@ public class MatrixIndexingSPInstruction extends 
IndexingSPInstruction {
                        //compute local index range 
                        long grix = 
UtilFunctions.computeCellIndex(ix.getRowIndex(), _brlen, 0);
                        long gcix = 
UtilFunctions.computeCellIndex(ix.getColumnIndex(), _bclen, 0);
-                       int lrl = (int)((_ixrange.rowStart<grix) ? 0 : 
_ixrange.rowStart - grix);
-                       int lcl = (int)((_ixrange.colStart<gcix) ? 0 : 
_ixrange.colStart - gcix);
-                       int lru = (int)Math.min(block.getNumRows()-1, 
_ixrange.rowEnd - grix);
-                       int lcu = (int)Math.min(block.getNumColumns()-1, 
_ixrange.colEnd - gcix);
+                       int lrl = IntUtils.toInt((_ixrange.rowStart<grix) ? 0 : 
_ixrange.rowStart - grix);
+                       int lcl = IntUtils.toInt((_ixrange.colStart<gcix) ? 0 : 
_ixrange.colStart - gcix);
+                       int lru = IntUtils.toInt(Math.min(block.getNumRows()-1, 
_ixrange.rowEnd - grix));
+                       int lcu = 
IntUtils.toInt(Math.min(block.getNumColumns()-1, _ixrange.colEnd - gcix));
                        
                        //compute output index
                        MatrixIndexes ixOut = new MatrixIndexes(

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index cd5e2ba..3d66b20 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -64,6 +64,7 @@ import 
org.apache.sysml.runtime.transform.encode.EncoderRecode;
 import org.apache.sysml.runtime.transform.encode.EncoderMVImpute.MVMethod;
 import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.transform.meta.TfOffsetMap;
+import org.apache.sysml.utils.IntUtils;
 
 import scala.Tuple2;
 
@@ -115,7 +116,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                        
                        //step 1: build transform meta data
                        Encoder encoderBuild = 
EncoderFactory.createEncoder(spec, colnames,
-                                       fo.getSchema(), 
(int)fo.getNumColumns(), null);
+                                       fo.getSchema(), 
IntUtils.toInt(fo.getNumColumns()), null);
                        
                        MaxLongAccumulator accMax = 
registerMaxLongAccumulator(sec.getSparkContext()); 
                        JavaRDD<String> rcMaps = in
@@ -146,7 +147,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        
                        //create encoder broadcast (avoiding replication per 
task) 
                        Encoder encoder = EncoderFactory.createEncoder(spec, 
colnames,
-                                       fo.getSchema(), 
(int)fo.getNumColumns(), meta);
+                                       fo.getSchema(), 
IntUtils.toInt(fo.getNumColumns()), meta);
                        
mcOut.setDimension(mcIn.getRows()-((omap!=null)?omap.getNumRmRows():0), 
encoder.getNumCols()); 
                        Broadcast<Encoder> bmeta = 
sec.getSparkContext().broadcast(encoder);
                        Broadcast<TfOffsetMap> bomap = (omap!=null) ? 
sec.getSparkContext().broadcast(omap) : null;

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
index 1b6435b..4ddafcc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -49,6 +49,7 @@ import 
org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.utils.IntUtils;
 
 /**
  * This pmapmm matrix multiplication instruction is still experimental
@@ -105,8 +106,8 @@ public class PMapmmSPInstruction extends 
BinarySPInstruction {
                                        .filter(new IsBlockInRange(i+1, 
i+NUM_ROWBLOCKS*mc1.getRowsPerBlock(), 1, mc1.getCols(), mc1))
                                        .mapToPair(new 
PMapMMRebaseBlocksFunction(i/mc1.getRowsPerBlock()));
                        
-                       int rlen = (int)Math.min(mc1.getRows()-i, 
NUM_ROWBLOCKS*mc1.getRowsPerBlock());
-                       PartitionedBlock<MatrixBlock> pmb = 
SparkExecutionContext.toPartitionedMatrixBlock(rdd, rlen, (int)mc1.getCols(), 
mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1L);
+                       int rlen = IntUtils.toInt(Math.min(mc1.getRows()-i, 
NUM_ROWBLOCKS*mc1.getRowsPerBlock()));
+                       PartitionedBlock<MatrixBlock> pmb = 
SparkExecutionContext.toPartitionedMatrixBlock(rdd, rlen, 
IntUtils.toInt(mc1.getCols()), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), 
-1L);
                        Broadcast<PartitionedBlock<MatrixBlock>> bpmb = 
sec.getSparkContext().broadcast(pmb);
                        
                        //matrix multiplication
@@ -190,7 +191,7 @@ public class PMapmmSPInstruction extends 
BinarySPInstruction {
                        
                        //get the right hand side matrix
                        for( int i=1; i<=pm.getNumRowBlocks(); i++ ) {
-                               MatrixBlock left = pm.getBlock(i, 
(int)ixIn.getRowIndex());
+                               MatrixBlock left = pm.getBlock(i, 
IntUtils.toInt(ixIn.getRowIndex()));
                        
                                //execute matrix-vector mult
                                OperationsOnMatrixValues.matMult(new 
MatrixIndexes(i,ixIn.getRowIndex()),

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 4a1c710..bd0344a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -83,6 +83,7 @@ import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.transform.meta.TfOffsetMap;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction {
        protected HashMap<String, String> params;
@@ -240,7 +241,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics( targetVar );
                        MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                
-                       int ngroups = (int) getLongParam(ec, 
Statement.GAGG_NUM_GROUPS);
+                       int ngroups = IntUtils.toInt(getLongParam(ec, 
Statement.GAGG_NUM_GROUPS));
                        
                        //single-block aggregation
                        if( ngroups <= mc1.getRowsPerBlock() && mc1.getCols() 
<= mc1.getColsPerBlock() ) {
@@ -401,12 +402,12 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                                
                                //update output statistics (required for 
correctness)
                                MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
-                               mcOut.set(rows?maxDim:mcIn.getRows(), 
rows?mcIn.getCols():maxDim, (int)brlen, (int)bclen, mcIn.getNonZeros());
+                               mcOut.set(rows?maxDim:mcIn.getRows(), 
rows?mcIn.getCols():maxDim, IntUtils.toInt(brlen), IntUtils.toInt(bclen), 
mcIn.getNonZeros());
                        }
                        else //special case: empty output (ensure valid dims)
                        {
                                int n = emptyReturn ? 1 : 0;
-                               MatrixBlock out = new 
MatrixBlock(rows?n:(int)mcIn.getRows(), rows?(int)mcIn.getCols():n, true); 
+                               MatrixBlock out = new 
MatrixBlock(rows?n:IntUtils.toInt(mcIn.getRows()), 
rows?IntUtils.toInt(mcIn.getCols()):n, true); 
                                sec.setMatrixOutput(output.getName(), out, 
getExtendedOpcode());
                        }
                }
@@ -466,8 +467,8 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        //repartition input vector for higher degree of 
parallelism 
                        //(avoid scenarios where few input partitions create 
huge outputs)
                        MatrixCharacteristics mcTmp = new 
MatrixCharacteristics(dirRows?lmaxVal:mcIn.getRows(),
-                                       dirRows?mcIn.getRows():lmaxVal, 
(int)brlen, (int)bclen, mcIn.getRows());
-                       int numParts = 
(int)Math.min(SparkUtils.getNumPreferredPartitions(mcTmp, in), 
mcIn.getNumBlocks());
+                                       dirRows?mcIn.getRows():lmaxVal, 
IntUtils.toInt(brlen), IntUtils.toInt(bclen), mcIn.getRows());
+                       int numParts = 
IntUtils.toInt(Math.min(SparkUtils.getNumPreferredPartitions(mcTmp, in), 
mcIn.getNumBlocks()));
                        if( numParts > in.getNumPartitions()*2 )
                                in = in.repartition(numParts);
                        
@@ -482,7 +483,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        
                        //update output statistics (required for correctness)
                        MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
-                       mcOut.set(dirRows?lmaxVal:mcIn.getRows(), 
dirRows?mcIn.getRows():lmaxVal, (int)brlen, (int)bclen, -1);
+                       mcOut.set(dirRows?lmaxVal:mcIn.getRows(), 
dirRows?mcIn.getRows():lmaxVal, IntUtils.toInt(brlen), IntUtils.toInt(bclen), 
-1);
                }
                else if ( opcode.equalsIgnoreCase("transformapply") ) 
                {
@@ -505,7 +506,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        
                        //create encoder broadcast (avoiding replication per 
task) 
                        Encoder encoder = 
EncoderFactory.createEncoder(params.get("spec"), colnames,
-                               fo.getSchema(), (int)fo.getNumColumns(), meta);
+                               fo.getSchema(), 
IntUtils.toInt(fo.getNumColumns()), meta);
                        
mcOut.setDimension(mcIn.getRows()-((omap!=null)?omap.getNumRmRows():0), 
encoder.getNumCols()); 
                        Broadcast<Encoder> bmeta = 
sec.getSparkContext().broadcast(encoder);
                        Broadcast<TfOffsetMap> bomap = (omap!=null) ? 
sec.getSparkContext().broadcast(omap) : null;
@@ -532,7 +533,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        //reblock if necessary (clen > bclen)
                        if( mc.getCols() > mc.getNumColBlocks() ) {
                                in = in.mapToPair(new 
RDDTransformDecodeExpandFunction(
-                                               (int)mc.getCols(), 
mc.getColsPerBlock()));
+                                               IntUtils.toInt(mc.getCols()), 
mc.getColsPerBlock()));
                                in = RDDAggregateUtils.mergeByKey(in, false);
                        }
                        
@@ -679,8 +680,8 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        //prepare inputs (for internal api compatibility)
                        IndexedMatrixValue data = 
SparkUtils.toIndexedMatrixBlock(arg0._1(),arg0._2());
                        IndexedMatrixValue offsets = _rmRows ?
-                               SparkUtils.toIndexedMatrixBlock(arg0._1(), 
_off.getBlock((int)arg0._1().getRowIndex(), 1)) :
-                               SparkUtils.toIndexedMatrixBlock(arg0._1(), 
_off.getBlock(1, (int)arg0._1().getColumnIndex()));
+                               SparkUtils.toIndexedMatrixBlock(arg0._1(), 
_off.getBlock(IntUtils.toInt(arg0._1().getRowIndex()), 1)) :
+                               SparkUtils.toIndexedMatrixBlock(arg0._1(), 
_off.getBlock(1, IntUtils.toInt(arg0._1().getColumnIndex())));
                        
                        //execute remove empty operations
                        ArrayList<IndexedMatrixValue> out = new ArrayList<>();
@@ -754,7 +755,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        //get all inputs
                        MatrixIndexes ix = arg0._1();
                        MatrixBlock target = arg0._2();         
-                       MatrixBlock groups = 
_pbm.getBlock((int)ix.getRowIndex(), 1);
+                       MatrixBlock groups = 
_pbm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1);
                        
                        //execute map grouped aggregate operations
                        IndexedMatrixValue in1 = 
SparkUtils.toIndexedMatrixBlock(ix, target);
@@ -790,7 +791,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        //get all inputs
                        MatrixIndexes ix = arg0._1();
                        MatrixBlock target = arg0._2();
-                       MatrixBlock groups = 
_pbm.getBlock((int)ix.getRowIndex(), 1);
+                       MatrixBlock groups = 
_pbm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1);
                        
                        //execute map grouped aggregate operations
                        return groups.groupedAggOperations(target, null, new 
MatrixBlock(), _ngroups, _op);
@@ -965,8 +966,8 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        MatrixBlock inBlk = in._2();
                        
                        //construct expanded block via leftindexing
-                       int cl = 
(int)UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 0)-1;
-                       int cu = 
(int)UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 
inBlk.getNumColumns()-1)-1;
+                       int cl = 
IntUtils.toInt(UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 
0)-1);
+                       int cu = 
IntUtils.toInt(UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 
inBlk.getNumColumns()-1)-1);
                        MatrixBlock out = new MatrixBlock(inBlk.getNumRows(), 
_clen, false);
                        out = out.leftIndexingOperations(inBlk, 0, 
inBlk.getNumRows()-1, cl, cu, null, UpdateType.INPLACE_PINNED);
                        
@@ -981,7 +982,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        }
                        
                        if ( params.get(Statement.GAGG_NUM_GROUPS) != null) {
-                               int ngroups = (int) 
Double.parseDouble(params.get(Statement.GAGG_NUM_GROUPS));
+                               int ngroups = IntUtils.toInt( 
Double.parseDouble(params.get(Statement.GAGG_NUM_GROUPS)));
                                mcOut.set(ngroups, mc1.getCols(), -1, -1); 
//grouped aggregate with cell output
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
index 3914c55..e9928ad 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
@@ -47,6 +47,7 @@ import 
org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 public class PmmSPInstruction extends BinarySPInstruction {
        private CacheType _type = null;
@@ -125,7 +126,7 @@ public class PmmSPInstruction extends BinarySPInstruction {
                        MatrixBlock mb2 = arg0._2();
                        
                        //get the right hand side matrix
-                       MatrixBlock mb1 = 
_pmV.getBlock((int)ixIn.getRowIndex(), 1);
+                       MatrixBlock mb1 = 
_pmV.getBlock(IntUtils.toInt(ixIn.getRowIndex()), 1);
                        
                        //compute target block indexes
                        long minPos = UtilFunctions.toLong( mb1.minNonZero() );

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
index 18f0bef..196f270 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
@@ -47,6 +47,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 public class QuantilePickSPInstruction extends BinarySPInstruction {
        private OperationTypes _type = null;
@@ -220,7 +221,7 @@ public class QuantilePickSPInstruction extends 
BinarySPInstruction {
                if( tmp.getNumRows() <= pos )
                        throw new DMLRuntimeException("Invalid key lookup for " 
+
                                pos + " in block of size " + 
tmp.getNumRows()+"x"+tmp.getNumColumns());
-               return val.get(0).quickGetValue((int)pos, 0);
+               return val.get(0).quickGetValue(IntUtils.toInt(pos), 0);
        }
        
        private static class FilterFunction implements 
Function<Tuple2<MatrixIndexes,MatrixBlock>, Boolean> 
@@ -337,7 +338,7 @@ public class QuantilePickSPInstruction extends 
BinarySPInstruction {
                                return Collections.emptyIterator();
                        
                        //determine which quantiles are active
-                       int qlen = (int)Arrays.stream(_qPIDs).filter(i -> 
i==v1).count();
+                       int qlen = 
IntUtils.toInt(Arrays.stream(_qPIDs).filter(i -> i==v1).count());
                        int[] qix = new int[qlen];
                        for(int i=0, pos=0; i<_qPIDs.length; i++)
                                if( _qPIDs[i]==v1 )

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
index a5a5d94..d717a8e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
@@ -58,6 +58,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.QuaternaryOperator;
+import org.apache.sysml.utils.IntUtils;
 
 public class QuaternarySPInstruction extends ComputationSPInstruction {
        private CPOperand _input4 = null;
@@ -374,8 +375,8 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction {
                                MatrixIndexes ixIn = arg._1();
                                MatrixBlock blkIn = arg._2();
                                MatrixBlock blkOut = new MatrixBlock();
-                               MatrixBlock mbU = 
_pmU.getBlock((int)ixIn.getRowIndex(), 1);
-                               MatrixBlock mbV = 
_pmV.getBlock((int)ixIn.getColumnIndex(), 1);
+                               MatrixBlock mbU = 
_pmU.getBlock(IntUtils.toInt(ixIn.getRowIndex()), 1);
+                               MatrixBlock mbV = 
_pmV.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1);
                                
                                //execute core operation
                                blkIn.quaternaryOperations(_qop, mbU, mbV, 
null, blkOut);
@@ -402,8 +403,8 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction {
                        MatrixBlock blkIn1 = arg0._2()._1();
                        MatrixBlock blkIn2 = arg0._2()._2();
                        MatrixBlock blkOut = new MatrixBlock();
-                       MatrixBlock mbU = 
(_pmU!=null)?_pmU.getBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
-                       MatrixBlock mbV = 
(_pmV!=null)?_pmV.getBlock((int)ixIn.getColumnIndex(), 1) : blkIn2;
+                       MatrixBlock mbU = 
(_pmU!=null)?_pmU.getBlock(IntUtils.toInt(ixIn.getRowIndex()), 1) : blkIn2;
+                       MatrixBlock mbV = 
(_pmV!=null)?_pmV.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1) : blkIn2;
                        MatrixBlock mbW = (_qop.hasFourInputs()) ? blkIn2 : 
null;
                        
                        //execute core operation
@@ -431,8 +432,8 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction {
                        MatrixBlock blkIn2 = arg0._2()._1()._2();
                        MatrixBlock blkIn3 = arg0._2()._2();
                        MatrixBlock blkOut = new MatrixBlock();
-                       MatrixBlock mbU = 
(_pmU!=null)?_pmU.getBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
-                       MatrixBlock mbV = 
(_pmV!=null)?_pmV.getBlock((int)ixIn.getColumnIndex(), 1) : 
+                       MatrixBlock mbU = 
(_pmU!=null)?_pmU.getBlock(IntUtils.toInt(ixIn.getRowIndex()), 1) : blkIn2;
+                       MatrixBlock mbV = 
(_pmV!=null)?_pmV.getBlock(IntUtils.toInt(ixIn.getColumnIndex()), 1) : 
                                              (_pmU!=null)? blkIn2 : blkIn3;
                        MatrixBlock mbW = (_qop.hasFourInputs())? blkIn3 : null;
                        

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index 2b9adcb..8386e2c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -65,6 +65,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.RandomMatrixGenerator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 import org.apache.sysml.utils.Statistics;
 
 public class RandSPInstruction extends UnarySPInstruction {
@@ -258,7 +259,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                        &&  ConfigurationManager.getExecutionMode() != 
RUNTIME_PLATFORM.SPARK )
                {
                        RandomMatrixGenerator rgen = 
LibMatrixDatagen.createRandomMatrixGenerator(
-                               pdf, (int)lrows, (int)lcols, rowsInBlock, 
colsInBlock, 
+                               pdf, IntUtils.toInt(lrows), 
IntUtils.toInt(lcols), rowsInBlock, colsInBlock, 
                                sparsity, minValue, maxValue, pdfParams);
                        MatrixBlock mb = MatrixBlock.randOperations(rgen, 
lSeed);
                        
@@ -290,7 +291,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                        }
                        
                        //for load balancing: degree of parallelism such that 
~128MB per partition
-                       int numPartitions = (int) 
Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1);
+                       int numPartitions = IntUtils.toInt( 
Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1));
                        
                        //create seeds rdd 
                        seedsRDD = 
sec.getSparkContext().parallelizePairs(seeds, numPartitions);
@@ -323,7 +324,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                        }
                        
                        //for load balancing: degree of parallelism such that 
~128MB per partition
-                       int numPartitions = (int) 
Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1);
+                       int numPartitions = IntUtils.toInt( 
Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1));
                        
                        //create seeds rdd 
                        seedsRDD = sec.getSparkContext()
@@ -381,7 +382,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                        }
                        
                        //for load balancing: degree of parallelism such that 
~128MB per partition
-                       int numPartitions = (int) 
Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1);
+                       int numPartitions = IntUtils.toInt( 
Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1));
                        
                        //create offset rdd
                        offsetsRDD = sec.getSparkContext().parallelize(offsets, 
numPartitions);
@@ -408,7 +409,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                        }
                        
                        //for load balancing: degree of parallelism such that 
~128MB per partition
-                       int numPartitions = (int) 
Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1);
+                       int numPartitions = IntUtils.toInt( 
Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1));
                        
                        //create seeds rdd 
                        offsetsRDD = sec.getSparkContext()
@@ -442,14 +443,14 @@ public class RandSPInstruction extends UnarySPInstruction 
{
                        LOG.trace("Process RandSPInstruction sample with 
range="+ maxValue +", size="+ lrows +", replace="+ replace + ", seed=" + seed);
                
                // sampling rate that guarantees a sample of size >= 
sampleSizeLowerBound 99.99% of the time.
-               double fraction = 
SamplingUtils.computeFractionForSampleSize((int)lrows, 
UtilFunctions.toLong(maxValue), replace);
+               double fraction = 
SamplingUtils.computeFractionForSampleSize(IntUtils.toInt(lrows), 
UtilFunctions.toLong(maxValue), replace);
                
                Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(seed);
 
                // divide the population range across numPartitions by creating 
SampleTasks
                double hdfsBlockSize = 
InfrastructureAnalyzer.getHDFSBlockSize();
                long outputSize = 
MatrixBlock.estimateSizeDenseInMemory(lrows,1);
-               int numPartitions = (int) 
Math.ceil((double)outputSize/hdfsBlockSize);
+               int numPartitions = IntUtils.toInt( 
Math.ceil((double)outputSize/hdfsBlockSize));
                long partitionSize = (long) Math.ceil(maxValue/numPartitions);
 
                ArrayList<SampleTask> offsets = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
index 3e09d17..d725883 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
@@ -58,6 +58,7 @@ import 
org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.IndexRange;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 public class ReorgSPInstruction extends UnarySPInstruction {
        // sort-specific attributes (to enable variable attributes)
@@ -169,7 +170,7 @@ public class ReorgSPInstruction extends UnarySPInstruction {
                                // extract column (if necessary) and sort 
                                if( !singleCol )
                                        out = out.filter(new IsBlockInRange(1, 
mcIn.getRows(), cols[0], cols[0], mcIn))
-                                               .mapValues(new 
ExtractColumn((int)UtilFunctions.computeCellInBlock(cols[0], 
mcIn.getColsPerBlock())));
+                                               .mapValues(new 
ExtractColumn(IntUtils.toInt(UtilFunctions.computeCellInBlock(cols[0], 
mcIn.getColsPerBlock()))));
                                
                                //actual index/data sort operation
                                if( ixret ) //sort indexes 
@@ -268,7 +269,7 @@ public class ReorgSPInstruction extends UnarySPInstruction {
                        ret.add(new Tuple2<>(ixOut,blkOut));
                        
                        // insert newly created empty blocks for entire row
-                       int numBlocks = (int) 
Math.ceil((double)_mcIn.getRows()/_mcIn.getRowsPerBlock());
+                       int numBlocks = IntUtils.toInt( 
Math.ceil((double)_mcIn.getRows()/_mcIn.getRowsPerBlock()));
                        for(int i = 1; i <= numBlocks; i++) {
                                if(i != ixOut.getColumnIndex()) {
                                        int lrlen = 
UtilFunctions.computeBlockSize(_mcIn.getRows(), rix, _mcIn.getRowsPerBlock());

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
index 90e5396..6be3e99 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
@@ -48,6 +48,7 @@ import org.apache.sysml.runtime.matrix.data.TripleIndexes;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.utils.IntUtils;
 
 public class RmmSPInstruction extends BinarySPInstruction {
 
@@ -114,7 +115,7 @@ public class RmmSPInstruction extends BinarySPInstruction {
                        * ((long) 
Math.ceil((double)mc2.getCols()/mc2.getColsPerBlock()));
                double matrix2PSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc2)
                        * ((long) 
Math.ceil((double)mc1.getRows()/mc1.getRowsPerBlock()));
-               return (int) 
Math.max(Math.ceil((matrix1PSize+matrix2PSize)/hdfsBlockSize), 1);
+               return IntUtils.toInt( 
Math.max(Math.ceil((matrix1PSize+matrix2PSize)/hdfsBlockSize), 1));
        }
 
        private static class RmmReplicateFunction implements 
PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, TripleIndexes, 
MatrixBlock> 

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index 8f63427..dd5c062 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -65,6 +65,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.utils.IntUtils;
 
 import scala.Tuple2;
 
@@ -144,7 +145,7 @@ public class SpoofSPInstruction extends SPInstruction {
                                        long numBlocks = 
(op.getCellType()==CellType.ROW_AGG ) ? 
                                                mcIn.getNumRowBlocks() : 
mcIn.getNumColBlocks();
                                        out = 
RDDAggregateUtils.aggByKeyStable(out, aggop,
-                                               
(int)Math.min(out.getNumPartitions(), numBlocks), false);
+                                                       
IntUtils.toInt(Math.min(out.getNumPartitions(), numBlocks)), false);
                                }
                                sec.setRDDHandleForVariable(_out.getName(), 
out);
                                
@@ -183,7 +184,7 @@ public class SpoofSPInstruction extends SPInstruction {
                                if(type == OutProdType.LEFT_OUTER_PRODUCT || 
type == OutProdType.RIGHT_OUTER_PRODUCT ) {
                                        long numBlocks = 
mcOut.getNumRowBlocks() * mcOut.getNumColBlocks();
                                        out = 
RDDAggregateUtils.sumByKeyStable(out,
-                                               
(int)Math.min(out.getNumPartitions(), numBlocks), false);
+                                                       
IntUtils.toInt(Math.min(out.getNumPartitions(), numBlocks)), false);
                                }
                                sec.setRDDHandleForVariable(_out.getName(), 
out);
                                
@@ -206,7 +207,7 @@ public class SpoofSPInstruction extends SPInstruction {
                        long clen2 = 
op.getRowType().isConstDim2(op.getConstDim2()) ? op.getConstDim2() :
                                op.getRowType().isRowTypeB1() ? 
sec.getMatrixCharacteristics(_in[1].getName()).getCols() : -1;
                        RowwiseFunction fmmc = new 
RowwiseFunction(_class.getName(), _classBytes, bcVect2,
-                               bcMatrices, scalars, mcIn.getRowsPerBlock(), 
(int)mcIn.getCols(), (int)clen2);
+                               bcMatrices, scalars, mcIn.getRowsPerBlock(), 
IntUtils.toInt(mcIn.getCols()), IntUtils.toInt(clen2));
                        out = in.mapPartitionsToPair(fmmc, 
op.getRowType()==RowType.ROW_AGG
                                        || op.getRowType() == RowType.NO_AGG);
                        
@@ -222,7 +223,7 @@ public class SpoofSPInstruction extends SPInstruction {
                        {
                                if( op.getRowType()==RowType.ROW_AGG && 
mcIn.getCols() > mcIn.getColsPerBlock() ) {
                                        out = 
RDDAggregateUtils.sumByKeyStable(out,
-                                               
(int)Math.min(out.getNumPartitions(), mcIn.getNumRowBlocks()), false);
+                                                       
IntUtils.toInt(Math.min(out.getNumPartitions(), mcIn.getNumRowBlocks())), 
false);
                                }
                                sec.setRDDHandleForVariable(_out.getName(), 
out);
                                
@@ -263,8 +264,8 @@ public class SpoofSPInstruction extends SPInstruction {
        }
        
        private static boolean[] getMatrixBroadcastVector(SparkExecutionContext 
sec, CPOperand[] inputs, boolean[] bcVect) {
-               int numMtx = (int) Arrays.stream(inputs)
-                       .filter(in -> in.getDataType().isMatrix()).count();
+               int numMtx = IntUtils.toInt(Arrays.stream(inputs)
+                       .filter(in -> in.getDataType().isMatrix()).count());
                boolean[] ret = new boolean[numMtx];
                for(int i=0, pos=0; i<inputs.length; i++)
                        if( inputs[i].getDataType().isMatrix() )
@@ -380,9 +381,9 @@ public class SpoofSPInstruction extends SPInstruction {
                        for( int i=0, posRdd=0, posBc=0; i<_bcInd.length; i++ ) 
{
                                if( _bcInd[i] ) {
                                        PartitionedBroadcast<MatrixBlock> pb = 
_inputs.get(posBc++);
-                                       int rowIndex = (int) ((outer && i==2) ? 
ixIn.getColumnIndex() : 
+                                       int rowIndex = IntUtils.toInt((outer && 
i==2) ? ixIn.getColumnIndex() : 
                                                
(pb.getNumRowBlocks()>=ixIn.getRowIndex())?ixIn.getRowIndex():1);
-                                       int colIndex = (int) ((outer && i==2) ? 
1 : 
+                                       int colIndex = IntUtils.toInt((outer && 
i==2) ? 1 : 
                                                
(pb.getNumColumnBlocks()>=ixIn.getColumnIndex())?ixIn.getColumnIndex():1);
                                        ret.add(pb.getBlock(rowIndex, 
colIndex));
                                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
index 5bb686b..16ed0dd 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
@@ -50,6 +50,7 @@ import 
org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.utils.IntUtils;
 
 import scala.Tuple2;
 
@@ -88,17 +89,17 @@ public class Tsmm2SPInstruction extends UnarySPInstruction {
                                        _type.isLeft() ? mc.getColsPerBlock()+1 
: 1, mc.getCols(), mc))
                          .mapToPair(new ShiftTSMMIndexesFunction(_type));      
        
                PartitionedBlock<MatrixBlock> pmb = 
SparkExecutionContext.toPartitionedMatrixBlock(tmp1, 
-                               (int)(_type.isLeft() ? mc.getRows() : 
mc.getRows() - mc.getRowsPerBlock()), 
-                               (int)(_type.isLeft() ? 
mc.getCols()-mc.getColsPerBlock() : mc.getCols()), 
+                               IntUtils.toInt(_type.isLeft() ? mc.getRows() : 
mc.getRows() - mc.getRowsPerBlock()), 
+                               IntUtils.toInt(_type.isLeft() ? 
mc.getCols()-mc.getColsPerBlock() : mc.getCols()), 
                                mc.getRowsPerBlock(), mc.getColsPerBlock(), 
-1L);
                Broadcast<PartitionedBlock<MatrixBlock>> bpmb = 
sec.getSparkContext().broadcast(pmb);
                
                //step 2: second pass of X, compute tsmm/mapmm and aggregate 
result blocks
-               int outputDim = (int) (_type.isLeft() ? mc.getCols() : 
mc.getRows());
+               int outputDim = IntUtils.toInt(_type.isLeft() ? mc.getCols() : 
mc.getRows());
                if( OptimizerUtils.estimateSize(outputDim, outputDim) <= 
32*1024*1024 ) { //default: <=32MB
                        //output large blocks and reduceAll to avoid skew on 
combineByKey
                        JavaRDD<MatrixBlock> tmp2 = in.map(
-                                       new RDDTSMM2ExtFunction(bpmb, _type, 
outputDim, (int)mc.getRowsPerBlock()));
+                                       new RDDTSMM2ExtFunction(bpmb, _type, 
outputDim, IntUtils.toInt(mc.getRowsPerBlock())));
                        MatrixBlock out = RDDAggregateUtils.sumStable(tmp2);
                      
                        //put output block into symbol table (no lineage 
because single block)
@@ -150,8 +151,8 @@ public class Tsmm2SPInstruction extends UnarySPInstruction {
                        if( _type.isLeft() ? ixin.getColumnIndex() == 1 : 
ixin.getRowIndex() == 1 ) {
                                //execute block mapmm operation for full block 
only (output two blocks, due to symmetry)
                                MatrixBlock mbin2 = _pb.getValue().getBlock( 
//lookup broadcast block
-                                               
(int)(_type.isLeft()?ixin.getRowIndex():1), 
-                                               
(int)(_type.isLeft()?1:ixin.getColumnIndex()));
+                                               
IntUtils.toInt(_type.isLeft()?ixin.getRowIndex():1), 
+                                               
IntUtils.toInt(_type.isLeft()?1:ixin.getColumnIndex()));
                                MatrixBlock mbin2t = transpose(mbin2, new 
MatrixBlock()); //prep for transpose rewrite mm
                                
                                MatrixBlock out2 = (MatrixBlock) 
OperationsOnMatrixValues.matMult( //mm
@@ -205,25 +206,25 @@ public class Tsmm2SPInstruction extends 
UnarySPInstruction {
                        
                        //execute block tsmm operation
                        MatrixBlock out1 = 
mbin.transposeSelfMatrixMultOperations(new MatrixBlock(), _type);
-                       int ix = (int) ((_type.isLeft() ? ixin.getColumnIndex() 
: ixin.getRowIndex())-1) * _blen;
+                       int ix = IntUtils.toInt((_type.isLeft() ? 
ixin.getColumnIndex() : ixin.getRowIndex())-1) * _blen;
                        out.copy(ix, ix+out1.getNumRows()-1, ix, 
ix+out1.getNumColumns()-1, out1, true);
                        
                        if( fullBlock ) {
                                //execute block mapmm operation for full block 
only (output two blocks, due to symmetry)
                                MatrixBlock mbin2 = _pb.getValue().getBlock( 
//lookup broadcast block
-                                               
(int)(_type.isLeft()?ixin.getRowIndex():1), 
-                                               
(int)(_type.isLeft()?1:ixin.getColumnIndex()));
+                                               
IntUtils.toInt(_type.isLeft()?ixin.getRowIndex():1), 
+                                               
IntUtils.toInt(_type.isLeft()?1:ixin.getColumnIndex()));
                                MatrixBlock mbin2t = transpose(mbin2, new 
MatrixBlock()); //prep for transpose rewrite mm
                                
                                MatrixBlock out2 = 
OperationsOnMatrixValues.matMult( //mm
                                                _type.isLeft() ? mbin2t : mbin, 
_type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op);
                                
                                MatrixIndexes ixout2 = _type.isLeft() ? new 
MatrixIndexes(2,1) : new MatrixIndexes(1,2);
-                               out.copy((int)(ixout2.getRowIndex()-1)*_blen, 
(int)(ixout2.getRowIndex()-1)*_blen+out2.getNumRows()-1, 
-                                               
(int)(ixout2.getColumnIndex()-1)*_blen, 
(int)(ixout2.getColumnIndex()-1)*_blen+out2.getNumColumns()-1, out2, true);
+                               
out.copy(IntUtils.toInt(ixout2.getRowIndex()-1)*_blen, 
IntUtils.toInt(ixout2.getRowIndex()-1)*_blen+out2.getNumRows()-1, 
+                                               
IntUtils.toInt(ixout2.getColumnIndex()-1)*_blen, 
IntUtils.toInt(ixout2.getColumnIndex()-1)*_blen+out2.getNumColumns()-1, out2, 
true);
                                MatrixBlock out3 = transpose(out2, new 
MatrixBlock()); 
-                               
out.copy((int)(ixout2.getColumnIndex()-1)*_blen, 
(int)(ixout2.getColumnIndex()-1)*_blen+out3.getNumRows()-1, 
-                                               
(int)(ixout2.getRowIndex()-1)*_blen, 
(int)(ixout2.getRowIndex()-1)*_blen+out3.getNumColumns()-1, out3, true);
+                               
out.copy(IntUtils.toInt(ixout2.getColumnIndex()-1)*_blen, 
IntUtils.toInt(ixout2.getColumnIndex()-1)*_blen+out3.getNumRows()-1, 
+                                               
IntUtils.toInt(ixout2.getRowIndex()-1)*_blen, 
IntUtils.toInt(ixout2.getRowIndex()-1)*_blen+out3.getNumColumns()-1, out3, 
true);
                        }
                        
                        return out;

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java
index 677fbbf..4428e2d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BlockPartitioner.java
@@ -25,6 +25,7 @@ import org.apache.spark.Partitioner;
 
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.utils.IntUtils;
 
 /**
  * Default partitioner used for all binary block rdd operations in order
@@ -75,7 +76,7 @@ public class BlockPartitioner extends Partitioner
                }
                
                //compute meta data for runtime
-               _ncparts = (int)Math.ceil((double)ncblks/_cbPerPart);
+               _ncparts = IntUtils.toInt(Math.ceil((double)ncblks/_cbPerPart));
                _numParts = numParts;
        }
        
@@ -90,8 +91,8 @@ public class BlockPartitioner extends Partitioner
                        
                //get partition id
                MatrixIndexes ix = (MatrixIndexes) arg0;
-               int ixr = (int)((ix.getRowIndex()-1)/_rbPerPart);
-               int ixc = (int)((ix.getColumnIndex()-1)/_cbPerPart);
+               int ixr = IntUtils.toInt((ix.getRowIndex()-1)/_rbPerPart);
+               int ixc = IntUtils.toInt((ix.getColumnIndex()-1)/_cbPerPart);
                int id = ixr * _ncparts + ixc;
                
                //ensure valid range

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index 8a4999b..62501e7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -34,6 +34,7 @@ import 
org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
 import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
 import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
+import org.apache.sysml.utils.IntUtils;
 
 /**
  * This class is for partitioned matrix/frame blocks, to be used as 
broadcasts. 
@@ -132,11 +133,11 @@ public class PartitionedBlock<T extends CacheBlock> 
implements Externalizable
        }
 
        public int getNumRowBlocks() {
-               return (int)Math.ceil((double)_rlen/_brlen);
+               return IntUtils.toInt(Math.ceil((double)_rlen/_brlen));
        }
 
        public int getNumColumnBlocks() {
-               return (int)Math.ceil((double)_clen/_bclen);
+               return IntUtils.toInt(Math.ceil((double)_clen/_bclen));
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
index ab29c4c..6eec144 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
@@ -30,6 +30,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.util.IndexRange;
+import org.apache.sysml.utils.IntUtils;
 
 /**
  * This class is a wrapper around an array of broadcasts of partitioned 
matrix/frame blocks,
@@ -71,11 +72,11 @@ public class PartitionedBroadcast<T extends CacheBlock> 
implements Serializable
        }
 
        public int getNumRowBlocks() {
-               return (int)_mc.getNumRowBlocks();
+               return IntUtils.toInt(_mc.getNumRowBlocks());
        }
        
        public int getNumColumnBlocks() {
-               return (int)_mc.getNumColBlocks();
+               return IntUtils.toInt(_mc.getNumColBlocks());
        }
 
        public static int computeBlocksPerPartition(long rlen, long clen, long 
brlen, long bclen) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
index 0ce654b..a2414bc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
@@ -31,6 +31,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 public class ExtractBlockForBinaryReblock implements 
PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, 
MatrixBlock> 
 {
@@ -96,8 +97,8 @@ public class ExtractBlockForBinaryReblock implements 
PairFlatMapFunction<Tuple2<
                                        blk.setNonZeros(in.getNonZeros());
                                }
                                else { //general case
-                                       for(int i2 = 0; i2 <= 
(int)(rowUpper-rowLower); i2++)
-                                               for(int j2 = 0; j2 <= 
(int)(colUpper-colLower); j2++)
+                                       for(int i2 = 0; i2 <= 
IntUtils.toInt(rowUpper-rowLower); i2++)
+                                               for(int j2 = 0; j2 <= 
IntUtils.toInt(colUpper-colLower); j2++)
                                                        
blk.appendValue(cixi+i2, cixj+j2, in.quickGetValue(aixi+i2, aixj+j2));
                                }
                                retVal.add(new Tuple2<>(indx, blk));

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
index 3e6908b..b05c0bf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
@@ -35,6 +35,7 @@ import org.apache.sysml.runtime.matrix.data.WeightedCell;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 public abstract class ExtractGroup implements Serializable 
 {
@@ -65,7 +66,7 @@ public abstract class ExtractGroup implements Serializable
                if(_op instanceof AggregateOperator && _ngroups > 0 
                        && OptimizerUtils.isValidCPDimensions(_ngroups, 
target.getNumColumns()) ) 
                {
-                       MatrixBlock tmp = group.groupedAggOperations(target, 
null, new MatrixBlock(), (int)_ngroups, _op);
+                       MatrixBlock tmp = group.groupedAggOperations(target, 
null, new MatrixBlock(), IntUtils.toInt(_ngroups), _op);
                        
                        for(int i=0; i<tmp.getNumRows(); i++) {
                                for( int j=0; j<tmp.getNumColumns(); j++ ) {
@@ -139,7 +140,7 @@ public abstract class ExtractGroup implements Serializable
                                throws Exception 
                {
                        MatrixIndexes ix = arg._1;
-                       MatrixBlock group = 
_pbm.getBlock((int)ix.getRowIndex(), 1);
+                       MatrixBlock group = 
_pbm.getBlock(IntUtils.toInt(ix.getRowIndex()), 1);
                        MatrixBlock target = arg._2;
                        
                        return execute(ix, group, target).iterator();

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
index afd2e1a..84d161a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
@@ -31,6 +31,7 @@ import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysml.utils.IntUtils;
 
 public class MatrixVectorBinaryOpPartitionFunction implements 
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, 
MatrixIndexes,MatrixBlock>
 {
@@ -74,8 +75,8 @@ public class MatrixVectorBinaryOpPartitionFunction implements 
PairFlatMapFunctio
                        MatrixBlock in1 = arg._2();
                        
                        //get the rhs block 
-                       int rix= (int)((_vtype==VectorType.COL_VECTOR) ? 
ix.getRowIndex() : 1);
-                       int cix= (int)((_vtype==VectorType.COL_VECTOR) ? 1 : 
ix.getColumnIndex());
+                       int rix= IntUtils.toInt((_vtype==VectorType.COL_VECTOR) 
? ix.getRowIndex() : 1);
+                       int cix= IntUtils.toInt((_vtype==VectorType.COL_VECTOR) 
? 1 : ix.getColumnIndex());
                        MatrixBlock in2 = _pmV.getBlock(rix, cix);
                                
                        //execute the binary operation

http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index c0fc34a..69be559 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -67,6 +67,7 @@ import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
 import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.IntUtils;
 
 import scala.Tuple2;
 
@@ -101,7 +102,7 @@ public class FrameRDDConverterUtils
                
                //prepare default schema if needed
                if( schema == null || schema.length==1 )
-                       schema = UtilFunctions.nCopies((int)mc.getCols(), 
ValueType.STRING);
+                       schema = 
UtilFunctions.nCopies(IntUtils.toInt(mc.getCols()), ValueType.STRING);
 
                //convert csv rdd to binary block rdd (w/ partial blocks)
                JavaPairRDD<Long, FrameBlock> out = 
prepinput.mapPartitionsToPair(
@@ -153,7 +154,7 @@ public class FrameRDDConverterUtils
                        JavaPairRDD<Long, Text> input, MatrixCharacteristics 
mc, ValueType[] schema ) {
                //prepare default schema if needed
                if( schema == null || schema.length==1 ) {
-                       schema = UtilFunctions.nCopies((int)mc.getCols(), 
+                       schema = 
UtilFunctions.nCopies(IntUtils.toInt(mc.getCols()), 
                                (schema!=null) ? schema[0] : ValueType.STRING);
                }
                
@@ -189,7 +190,7 @@ public class FrameRDDConverterUtils
                if(mcIn.getCols() > mcIn.getColsPerBlock()) {
                        //split matrix blocks into extended matrix blocks 
                        in = in.flatMapToPair(new 
MatrixFrameReblockFunction(mcIn));
-                       
mc.setBlockSize(MatrixFrameReblockFunction.computeBlockSize(mc), 
(int)mc.getCols());
+                       
mc.setBlockSize(MatrixFrameReblockFunction.computeBlockSize(mc), 
IntUtils.toInt(mc.getCols()));
                        
                        //shuffle matrix blocks (instead of frame blocks) in 
order to exploit 
                        //sparse formats (for sparse or wide matrices) during 
shuffle
@@ -238,8 +239,8 @@ public class FrameRDDConverterUtils
                                df.javaRDD().zipWithIndex(); //zip row index
 
                //convert data frame to frame schema (prepare once)
-               String[] colnames = new String[(int)mc.getCols()];
-               ValueType[] fschema = new ValueType[(int)mc.getCols()];
+               String[] colnames = new String[IntUtils.toInt(mc.getCols())];
+               ValueType[] fschema = new 
ValueType[IntUtils.toInt(mc.getCols())];
                int colVect = convertDFSchemaToFrameSchema(df.schema(), 
colnames, fschema, containsID);
                out.set(colnames, fschema); //make schema available
                
@@ -260,7 +261,7 @@ public class FrameRDDConverterUtils
                                
                //create data frame schema
                if( schema == null )
-                       schema = UtilFunctions.nCopies((int)mc.getCols(), 
ValueType.STRING);
+                       schema = 
UtilFunctions.nCopies(IntUtils.toInt(mc.getCols()), ValueType.STRING);
                StructType dfSchema = convertFrameSchemaToDFSchema(schema, 
true);
        
                //rdd to data frame conversion
@@ -552,7 +553,7 @@ public class FrameRDDConverterUtils
                        _schema = schema;
                        _hasHeader = hasHeader;
                        _delim = delim;
-                       _maxRowsPerBlock = Math.max((int) 
(FrameBlock.BUFFER_SIZE/_clen), 1);
+                       _maxRowsPerBlock = 
Math.max(IntUtils.toInt(FrameBlock.BUFFER_SIZE/_clen), 1);
                }
 
                @Override
@@ -563,7 +564,7 @@ public class FrameRDDConverterUtils
 
                        long ix = -1;
                        FrameBlock fb = null;
-                       String[] tmprow = new String[(int)_clen]; 
+                       String[] tmprow = new String[IntUtils.toInt(_clen)]; 
                        
                        while( arg0.hasNext() )
                        {
@@ -575,11 +576,11 @@ public class FrameRDDConverterUtils
                                        continue;
                                }
                                if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) {
-                                       _mvMeta = 
Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, 
(int)_clen+1));
+                                       _mvMeta = 
Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, 
IntUtils.toInt(_clen+1)));
                                        continue;
                                }
                                else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) 
) {
-                                       _ndMeta = 
Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, 
(int)_clen+1));
+                                       _ndMeta = 
Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, 
IntUtils.toInt(_clen+1)));
                                        continue;
                                }
                                
@@ -716,7 +717,7 @@ public class FrameRDDConverterUtils
                        _schema = schema;
                        _containsID = containsID;
                        _colVect = colVect;
-                       _maxRowsPerBlock = Math.max((int) 
(FrameBlock.BUFFER_SIZE/_clen), 1);
+                       _maxRowsPerBlock = 
Math.max(IntUtils.toInt(FrameBlock.BUFFER_SIZE/_clen), 1);
                }
                
                @Override
@@ -727,7 +728,7 @@ public class FrameRDDConverterUtils
 
                        long ix = -1;
                        FrameBlock fb = null;
-                       Object[] tmprow = new Object[(int)_clen];
+                       Object[] tmprow = new Object[IntUtils.toInt(_clen)];
                        
                        while( arg0.hasNext() )
                        {
@@ -815,7 +816,7 @@ public class FrameRDDConverterUtils
                        _clen = mc.getCols();
                        
                        //determine upper bounded buffer len
-                       _bufflen = (int) Math.min(_rlen*_clen, 
FrameBlock.BUFFER_SIZE);
+                       _bufflen = IntUtils.toInt(Math.min(_rlen*_clen, 
FrameBlock.BUFFER_SIZE));
                }
 
                protected void flushBufferToList( FrameReblockBuffer rbuff,  
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
@@ -857,7 +858,7 @@ public class FrameRDDConverterUtils
                                st.reset( strVal );
                                long row = st.nextLong();
                                long col = st.nextLong();
-                               Object val = 
UtilFunctions.stringToObject(_schema[(int)col-1], st.nextToken());
+                               Object val = 
UtilFunctions.stringToObject(_schema[IntUtils.toInt(col-1)], st.nextToken());
                                
                                //flush buffer if necessary
                                if( rbuff.getSize() >= rbuff.getCapacity() )
@@ -908,7 +909,7 @@ public class FrameRDDConverterUtils
                        long rowix = (ix.getRowIndex()-1)*_brlen+1;
 
                        //global index within frame block (0-based)
-                       long cl = (int)((ix.getColumnIndex()-1)*_bclen);
+                       long cl = 
IntUtils.toInt((ix.getColumnIndex()-1)*_bclen);
                        long cu = Math.min(cl+mb.getNumColumns()-1, _clen);
 
                        //prepare output frame blocks 
@@ -916,8 +917,8 @@ public class FrameRDDConverterUtils
                                int ru = Math.min(i+_maxRowsPerBlock, 
mb.getNumRows())-1;
                                long rix = 
UtilFunctions.computeBlockIndex(rowix+i, _maxRowsPerBlock);
                                MatrixIndexes ixout = new MatrixIndexes(rix, 1);
-                               MatrixBlock out = new MatrixBlock(ru-i+1, 
(int)_clen, sparse);
-                               out.copy(0, out.getNumRows()-1, (int)cl, 
(int)cu, 
+                               MatrixBlock out = new MatrixBlock(ru-i+1, 
IntUtils.toInt(_clen), sparse);
+                               out.copy(0, out.getNumRows()-1, 
IntUtils.toInt(cl), IntUtils.toInt(cu), 
                                        mb.slice(i, ru, 0, 
mb.getNumColumns()-1, mbreuse), true);
                                out.examSparsity();
                                ret.add(new Tuple2<>(ixout,out));
@@ -935,8 +936,8 @@ public class FrameRDDConverterUtils
                 */
                public static int computeBlockSize(MatrixCharacteristics mc) {
                        int brlen = mc.getRowsPerBlock();
-                       int basic = 
Math.max((int)(FrameBlock.BUFFER_SIZE/mc.getCols()), 1);
-                       int div = (int)Math.ceil((double)brlen/basic);
+                       int basic = 
Math.max(IntUtils.toInt(FrameBlock.BUFFER_SIZE/mc.getCols()), 1);
+                       int div = 
IntUtils.toInt(Math.ceil((double)brlen/basic));
                        while( brlen % div != 0 ) 
                                div++;
                        return brlen / div;
@@ -995,8 +996,8 @@ public class FrameRDDConverterUtils
                        for( long rix=rstartix; rix<=rendix; rix++ ) { //for 
all row blocks
                                long rpos = UtilFunctions.computeCellIndex(rix, 
brlen, 0);
                                int lrlen = 
UtilFunctions.computeBlockSize(rlen, rix, brlen);
-                               int fix = (int)((rpos-rowIndex>=0) ? 
rpos-rowIndex : 0);
-                               int fix2 = 
(int)Math.min(rpos+lrlen-rowIndex-1,blk.getNumRows()-1);
+                               int fix = IntUtils.toInt((rpos-rowIndex>=0) ? 
rpos-rowIndex : 0);
+                               int fix2 = 
IntUtils.toInt(Math.min(rpos+lrlen-rowIndex-1,blk.getNumRows()-1));
                                int mix = 
UtilFunctions.computeCellInBlock(rowIndex+fix, brlen);
                                int mix2 = mix + (fix2-fix);
                                for( long cix=1; cix<=cendix; cix++ ) { //for 
all column blocks
@@ -1004,7 +1005,7 @@ public class FrameRDDConverterUtils
                                        int lclen = 
UtilFunctions.computeBlockSize(clen, cix, bclen);
                                        MatrixBlock matrix = new 
MatrixBlock(lrlen, lclen, false);
                                        FrameBlock frame = blk.slice(fix, fix2, 
-                                                       (int)cpos-1, 
(int)cpos+lclen-2, new FrameBlock());
+                                                       IntUtils.toInt(cpos-1), 
IntUtils.toInt(cpos+lclen-2), new FrameBlock());
                                        MatrixBlock mframe = 
DataConverter.convertToMatrixBlock(frame);
                                        ret.add(new Tuple2<>(new 
MatrixIndexes(rix, cix), 
                                                        
matrix.leftIndexingOperations(mframe, mix, mix2, 0, lclen-1, 

Reply via email to