Repository: incubator-systemml Updated Branches: refs/heads/master d39865e9e -> f10fa2b8f
[SYSTEMML-881] [WIP] Added indexing, relational and boolean operators + mathematical/trigonometric builtin functions for Python DSL - Left and Right Indexing - Relational operators (<, <=, >, >=, ==, !=) - Boolean operators (&, |) - Mathematical and Trignometric builtin functions: 'exp', 'log', 'abs', 'sqrt', 'round', 'floor', 'ceil', 'sin', 'cos', 'tan', 'asin', 'acos', 'atan', 'sign' Closes #233. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f10fa2b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f10fa2b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f10fa2b8 Branch: refs/heads/master Commit: f10fa2b8f94ab64395585de9826696b3846aa3b7 Parents: d39865e Author: Niketan Pansare <npan...@us.ibm.com> Authored: Tue Sep 6 16:02:47 2016 -0700 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Tue Sep 6 16:02:47 2016 -0700 ---------------------------------------------------------------------- src/main/python/systemml/defmatrix.py | 365 ++++++++++++++++++++++++----- 1 file changed, 303 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f10fa2b8/src/main/python/systemml/defmatrix.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/defmatrix.py b/src/main/python/systemml/defmatrix.py index 2994092..e1bd3b9 100644 --- a/src/main/python/systemml/defmatrix.py +++ b/src/main/python/systemml/defmatrix.py @@ -19,7 +19,9 @@ # #------------------------------------------------------------- -__all__ = [ 'setSparkContext', 'matrix', 'eval', 'solve'] +trigFn = [ 'exp', 'log', 'abs', 'sqrt', 'round', 'floor', 'ceil', 'sin', 'cos', 'tan', 'asin', 'acos', 'atan', 'sign' ] +__all__ = [ 'setSparkContext', 'matrix', 'eval', 'solve' ] + trigFn + from pyspark import SparkContext from pyspark.sql import DataFrame, SQLContext @@ -36,14 +38,14 @@ def setSparkContext(sc): sc: SparkContext SparkContext """ - matrix.ml = MLContext(sc) matrix.sc = sc - + matrix.ml = MLContext(matrix.sc) def checkIfMLContextIsSet(): if matrix.ml is None: raise Exception('Expected setSparkContext(sc) to be called.') +########################## AST related operations ################################## class DMLOp(object): """ @@ -55,7 +57,24 @@ class DMLOp(object): def _visit(self, execute=True): matrix.dml = matrix.dml + self.dml + +# Special object used internally to specify the placeholder which will be replaced by output ID +# This helps to provide dml containing output ID in constructIntermediateNode +OUTPUT_ID = '$$OutputID$$' +def constructIntermediateNode(inputs, dml): + """ + Convenient utility to create an intermediate node of AST. + + Parameters + ---------- + inputs = list of input matrix objects and/or DMLOp + dml = list of DML string (which will be eventually joined before execution). To specify out.ID, please use the placeholder + """ + dmlOp = DMLOp(inputs) + out = matrix(None, op=dmlOp) + dmlOp.dml = [out.ID if x==OUTPUT_ID else x for x in dml] + return out def reset(): """ @@ -64,6 +83,23 @@ def reset(): for m in matrix.visited: m.visited = False matrix.visited = [] + matrix.ml = MLContext(matrix.sc) + matrix.dml = [] + matrix.script = pydml('') + +def performDFS(outputs, execute): + """ + Traverses the forest of nodes rooted at outputs nodes and returns the DML script to execute + """ + for m in outputs: + m.output = True + m._visit(execute=execute) + return ''.join(matrix.dml) + +############################################################################### + + +########################## Utility functions ################################## def binaryOp(lhs, rhs, opStr): @@ -85,26 +121,133 @@ def binaryOp(lhs, rhs, opStr): rhsStr = str(rhs) else: raise TypeError('Incorrect type') - dmlOp = DMLOp(inputs) - out = matrix(None, op=dmlOp) - dmlOp.dml = [out.ID, ' = ', lhsStr, opStr, rhsStr, '\n'] - return out + return constructIntermediateNode(inputs, [OUTPUT_ID, ' = ', lhsStr, opStr, rhsStr, '\n']) +def getValue(obj): + if isinstance(obj, matrix): + return obj.ID + elif isinstance(obj, float) or isinstance(obj, int): + return str(obj) + else: + raise TypeError('Unsupported type for ' + s) def binaryMatrixFunction(X, Y, fnName): """ - Common function called by supported PyDML built-in function that has two arguments both of which are matrices. - TODO: This needs to be generalized to support arbitrary arguments of differen types. + Common function called by supported PyDML built-in function that has two arguments. """ - if not isinstance(X, matrix) or not isinstance(Y, matrix): - raise TypeError('Incorrect input type. Expected matrix type') - inputs = [X, Y] - dmlOp = DMLOp(inputs) - out = matrix(None, op=dmlOp) - dmlOp.dml = [out.ID, ' = ', fnName,'(', X.ID, ', ', Y.ID, ')\n'] - return out + return constructIntermediateNode([X, Y], [OUTPUT_ID, ' = ', fnName,'(', getValue(X), ', ', getValue(Y), ')\n']) + +def unaryMatrixFunction(X, fnName): + """ + Common function called by supported PyDML built-in function that has one argument. + """ + return constructIntermediateNode([X], [OUTPUT_ID, ' = ', fnName,'(', getValue(X), ')\n']) + +# utility function that converts 1:3 into DML string +def convertSeqToDML(s): + ret = [] + if s is None: + return '' + elif isinstance(s, slice): + if s.step is not None: + raise ValueError('Slicing with step is not supported.') + if s.start is None: + ret = ret + [ '0 : ' ] + else: + ret = ret + [ getValue(s.start), ':' ] + if s.start is None: + ret = ret + [ '' ] + else: + ret = ret + [ getValue(s.stop) ] + else: + ret = ret + [ getValue(s) ] + return ''.join(ret) + +# utility function that converts index (such as [1, 2:3]) into DML string +def getIndexingDML(index): + ret = [ '[' ] + if isinstance(index, tuple) and len(index) == 1: + ret = ret + [ convertSeqToDML(index[0]), ',' ] + elif isinstance(index, tuple) and len(index) == 2: + ret = ret + [ convertSeqToDML(index[0]), ',', convertSeqToDML(index[1]) ] + else: + raise TypeError('matrix indexes can only be tuple of length 2. For example: m[1,1], m[0:1,], m[:, 0:1]') + return ret + [ ']' ] + +def convertOutputsToList(outputs): + if isinstance(outputs, matrix): + return [ outputs ] + elif isinstance(outputs, list): + for o in outputs: + if not isinstance(o, matrix): + raise TypeError('Only matrix or list of matrix allowed') + return outputs + else: + raise TypeError('Only matrix or list of matrix allowed') + +def resetOutputFlag(outputs): + for m in outputs: + m.output = False + +def populateOutputs(outputs, results, outputDF): + """ + Set the attribute 'data' of the matrix by fetching it from MLResults class + """ + for m in outputs: + if outputDF: + m.data = results.getDataFrame(m.ID) + else: + m.data = results.getNumPyArray(m.ID) + +############################################################################### + +########################## Global user-facing functions ####################### + +def exp(X): + return unaryMatrixFunction(X, 'exp') + +def log(X, y=None): + if y is None: + return unaryMatrixFunction(X, 'log') + else: + return binaryMatrixFunction(X, y, 'log') + +def abs(X): + return unaryMatrixFunction(X, 'abs') + +def sqrt(X): + return unaryMatrixFunction(X, 'sqrt') + +def round(X): + return unaryMatrixFunction(X, 'round') + +def floor(X): + return unaryMatrixFunction(X, 'floor') +def ceil(X): + return unaryMatrixFunction(X, 'ceil') + +def sin(X): + return unaryMatrixFunction(X, 'sin') +def cos(X): + return unaryMatrixFunction(X, 'cos') + +def tan(X): + return unaryMatrixFunction(X, 'tan') + +def asin(X): + return unaryMatrixFunction(X, 'asin') + +def acos(X): + return unaryMatrixFunction(X, 'acos') + +def atan(X): + return unaryMatrixFunction(X, 'atan') + +def sign(X): + return unaryMatrixFunction(X, 'sign') + def solve(A, b): """ Computes the least squares solution for system of linear equations A %*% x = b @@ -133,39 +276,46 @@ def solve(A, b): """ return binaryMatrixFunction(A, b, 'solve') - def eval(outputs, outputDF=False, execute=True): """ Executes the unevaluated DML script and computes the matrices specified by outputs. Parameters ---------- - outputs: list of matrices + outputs: list of matrices or a matrix object outputDF: back the data of matrix as PySpark DataFrame """ checkIfMLContextIsSet() reset() - matrix.dml = [] - matrix.script = pydml('') - if isinstance(outputs, matrix): - outputs = [ outputs ] - elif not isinstance(outputs, list): - raise TypeError('Incorrect input type') - for m in outputs: - m.output = True - m._visit(execute=execute) + outputs = convertOutputsToList(outputs) + matrix.script.scriptString = performDFS(outputs, execute) if not execute: - return ''.join(matrix.dml) - matrix.script.scriptString = ''.join(matrix.dml) + resetOutputFlag(outputs) + return matrix.script.scriptString results = matrix.ml.execute(matrix.script) - # Note: an evaluated matrix contains a data field computed by eval method as DataFrame or NumPy array. - for m in outputs: - if outputDF: - m.data = results.getDataFrame(m.ID) - else: - m.data = results.getNumPyArray(m.ID) - - + populateOutputs(outputs, results, outputDF) + resetOutputFlag(outputs) + +############################################################################### + +# DESIGN DECISIONS: +# 1. Until eval() method is invoked, we create an AST (not exposed to the user) that consist of unevaluated operations and data required by those operations. +# As an anology, a spark user can treat eval() method similar to calling RDD.persist() followed by RDD.count(). +# 2. The AST consist of two kinds of nodes: either of type matrix or of type DMLOp. +# Both these classes expose _visit method, that helps in traversing the AST in DFS manner. +# 3. A matrix object can either be evaluated or not. +# If evaluated, the attribute 'data' is set to one of the supported types (for example: NumPy array or DataFrame). In this case, the attribute 'op' is set to None. +# If not evaluated, the attribute 'op' which refers to one of the intermediate node of AST and if of type DMLOp. In this case, the attribute 'data' is set to None. +# 5. DMLOp has an attribute 'inputs' which contains list of matrix objects or DMLOp. +# 6. To simplify the traversal, every matrix object is considered immutable and an matrix operations creates a new matrix object. +# As an example: +# - m1 = sml.matrix(np.ones((3,3))) creates a matrix object backed by 'data=(np.ones((3,3))'. +# - m1 = m1 * 2 will create a new matrix object which is now backed by 'op=DMLOp( ... )' whose input is earlier created matrix object. +# 7. Left indexing (implemented in __setitem__ method) is a special case, where Python expects the existing object to be mutated. +# To ensure the above property, we make deep copy of existing object and point any references to the left-indexed matrix to the newly created object. +# Then the left-indexed matrix is set to be backed by DMLOp consisting of following pydml: +# left-indexed-matrix = new-deep-copied-matrix +# left-indexed-matrix[index] = value class matrix(object): """ matrix class is a python wrapper that implements basic matrix operator. @@ -239,16 +389,14 @@ class matrix(object): matrix.systemmlVarID += 1 self.output = False self.ID = 'mVar' + str(matrix.systemmlVarID) - if isinstance(data, SUPPORTED_TYPES): - self.data = data - elif hasattr(data, '_jdf'): - self.data = data - elif data is None and op is not None: - self.data = None - # op refers to the node of Abstract Syntax Tree created internally for lazy evaluation - self.op = op - else: + self.referenced = [] + # op refers to the node of Abstract Syntax Tree created internally for lazy evaluation + self.op = op + self.data = data + if not (isinstance(data, SUPPORTED_TYPES) or hasattr(data, '_jdf') or (data is None and op is not None)): raise TypeError('Unsupported input type') + if op is not None: + self.referenced = self.referenced + [ op ] def eval(self, outputDF=False): """ @@ -287,6 +435,27 @@ class matrix(object): self.data = sqlContext.createDataFrame(self.toPandas()) return self.data + def _markAsVisited(self): + self.visited = True + # for cleanup + matrix.visited = matrix.visited + [ self ] + return self + + def _registerAsInput(self, execute): + # TODO: Remove this when automatic registration of frame is resolved + matrix.dml = [ self.ID, ' = load(\" \", format=\"csv\")\n'] + matrix.dml + if isinstance(self.data, DataFrame) and execute: + matrix.script.input(self.ID, self.data) + elif execute: + matrix.script.input(self.ID, convertToMatrixBlock(matrix.sc, self.data)) + return self + + def _registerAsOutput(self, execute): + # TODO: Remove this when automatic registration of frame is resolved + matrix.dml = matrix.dml + ['save(', self.ID, ', \" \")\n'] + if execute: + matrix.script.output(self.ID) + def _visit(self, execute=True): """ This function is called for two scenarios: @@ -296,26 +465,18 @@ class matrix(object): """ if self.visited: return self - self.visited = True - # for cleanup - matrix.visited = matrix.visited + [ self ] + self._markAsVisited() if self.data is not None: - matrix.dml = matrix.dml + [ self.ID, ' = load(\" \", format=\"csv\")\n'] - if isinstance(self.data, DataFrame) and execute: - matrix.script.input(self.ID, self.data) - elif execute: - matrix.script.input(self.ID, convertToMatrixBlock(matrix.sc, self.data)) - return self + self._registerAsInput(execute) elif self.op is not None: + # Traverse the AST for m in self.op.inputs: m._visit(execute=execute) self.op._visit(execute=execute) else: raise Exception('Expected either op or data to be set') if self.data is None and self.output: - matrix.dml = matrix.dml + ['save(', self.ID, ', \" \")\n'] - if execute: - matrix.script.output(self.ID) + self._registerAsOutput(execute) return self def __repr__(self): @@ -330,6 +491,8 @@ class matrix(object): print('# This matrix (' + self.ID + ') is backed by NumPy array. To fetch the NumPy array, invoke toNumPyArray() method.') return '<SystemML.defmatrix.matrix object>' + ######################### Arithmetic operators ###################################### + def __add__(self, other): return binaryOp(self, other, ' + ') @@ -343,8 +506,17 @@ class matrix(object): return binaryOp(self, other, ' // ') def __div__(self, other): + """ + Performs division (Python 2 way). + """ return binaryOp(self, other, ' / ') + def __truediv__(self, other): + """ + Performs division (Python 3 way). + """ + return binaryOp(self, other, ' / ') + def __mod__(self, other): return binaryOp(self, other, ' % ') @@ -371,7 +543,49 @@ class matrix(object): def __rpow__(self, other): return binaryOp(other, self, ' ** ') - + + def dot(self, other): + """ + Numpy way of performing matrix multiplication + """ + return binaryMatrixFunction(self, other, 'dot') + + def __matmul__(self, other): + """ + Performs matrix multiplication (infix operator: @). See PEP 465) + """ + return binaryMatrixFunction(self, other, 'dot') + + + ######################### Relational/Boolean operators ###################################### + + def __lt__(self, other): + return binaryOp(other, self, ' < ') + + def __le__(self, other): + return binaryOp(other, self, ' <= ') + + def __gt__(self, other): + return binaryOp(other, self, ' > ') + + def __ge__(self, other): + return binaryOp(other, self, ' >= ') + + def __eq__(self, other): + return binaryOp(other, self, ' == ') + + def __ne__(self, other): + return binaryOp(other, self, ' != ') + + # TODO: Cast the output back into scalar and return boolean results + def __and__(self, other): + return binaryOp(other, self, ' & ') + + def __or__(self, other): + return binaryOp(other, self, ' | ') + + ######################### Aggregation functions ###################################### + def sum(self, axis=None): return self._aggFn('sum', axis) @@ -410,6 +624,33 @@ class matrix(object): else: dmlOp.dml = [out.ID, ' = ', fnName, '(', self.ID, ', axis=', str(axis) ,')\n'] return out - - def dot(self, other): - return binaryMatrixFunction(self, other, 'dot') + + ######################### Indexing operators ###################################### + + def __getitem__(self, index): + """ + Implements evaluation of right indexing operations such as m[1,1], m[0:1,], m[:, 0:1] + """ + dmlOp = DMLOp([self]) + out = matrix(None, op=dmlOp) + dmlOp.dml = [out.ID, ' = ', self.ID ] + getIndexingDML(index) + [ '\n' ] + return out + + # Performs deep copy if the matrix is backed by data + def _prepareForInPlaceUpdate(self): + temp = matrix(self.data, op=self.op) + self.ID, temp.ID = temp.ID, self.ID # Copy even the IDs as the IDs might be used to create DML + for op in self.referenced: + op.inputs.remove(self) #while self in op.inputs: + op.inputs = op.inputs + [ temp ] + self.op = DMLOp([temp], dml=[self.ID, " = ", temp.ID]) + self.data = None + + def __setitem__(self, index, value): + """ + Implements evaluation of left indexing operations such as m[1,1]=2 + """ + self._prepareForInPlaceUpdate() + if isinstance(value, matrix): + self.op.inputs = self.op.inputs + [ value ] + self.op.dml = self.op.dml + [ '\n', self.ID ] + getIndexingDML(index) + [ ' = ', getValue(value), '\n'] \ No newline at end of file