Repository: incubator-systemml Updated Branches: refs/heads/master 9279e8b17 -> 48a7267f8
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/SystemML/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/SystemML/mllearn/estimators.py b/src/main/python/SystemML/mllearn/estimators.py new file mode 100644 index 0000000..5d33d64 --- /dev/null +++ b/src/main/python/SystemML/mllearn/estimators.py @@ -0,0 +1,302 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from pyspark.context import SparkContext +from pyspark.sql import DataFrame, SQLContext +from pyspark.rdd import RDD +import numpy as np +import pandas as pd +import sklearn as sk +from pyspark.ml.feature import VectorAssembler +from pyspark.mllib.linalg import Vectors +from pyspark.ml import Estimator, Model + +from ..converters import * + +def assemble(sqlCtx, pdf, inputCols, outputCol): + tmpDF = sqlCtx.createDataFrame(pdf, list(pdf.columns)) + assembler = VectorAssembler(inputCols=list(inputCols), outputCol=outputCol) + return assembler.transform(tmpDF) + +class BaseSystemMLEstimator(Estimator): + featuresCol = 'features' + labelCol = 'label' + + def setFeaturesCol(self, colName): + """ + Sets the default column name for features of PySpark DataFrame. + + Parameters + ---------- + colName: column name for features (default: 'features') + """ + self.featuresCol = colName + + def setLabelCol(self, colName): + """ + Sets the default column name for features of PySpark DataFrame. + + Parameters + ---------- + colName: column name for features (default: 'label') + """ + self.labelCol = colName + + # Returns a model after calling fit(df) on Estimator object on JVM + def _fit(self, X): + """ + Invokes the fit method on Estimator object on JVM if X is PySpark DataFrame + + Parameters + ---------- + X: PySpark DataFrame that contain the columns featuresCol (default: 'features') and labelCol (default: 'label') + """ + if hasattr(X, '_jdf') and self.featuresCol in X.columns and self.labelCol in X.columns: + self.model = self.estimator.fit(X._jdf) + return self + else: + raise Exception('Incorrect usage: Expected dataframe as input with features/label as columns') + + def fit(self, X, y=None, params=None): + """ + Invokes the fit method on Estimator object on JVM if X and y are on of the supported data types + + Parameters + ---------- + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix + y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix + """ + if y is None: + return self._fit(X) + elif y is not None and isinstance(X, SUPPORTED_TYPES) and isinstance(y, SUPPORTED_TYPES): + if self.transferUsingDF: + pdfX = convertToPandasDF(X) + pdfY = convertToPandasDF(y) + if getNumCols(pdfY) != 1: + raise Exception('y should be a column vector') + if pdfX.shape[0] != pdfY.shape[0]: + raise Exception('Number of rows of X and y should match') + colNames = pdfX.columns + pdfX[self.labelCol] = pdfY[pdfY.columns[0]] + df = assemble(self.sqlCtx, pdfX, colNames, self.featuresCol).select(self.featuresCol, self.labelCol) + self.model = self.estimator.fit(df._jdf) + else: + numColsy = getNumCols(y) + if numColsy != 1: + raise Exception('Expected y to be a column vector') + self.model = self.estimator.fit(convertToMatrixBlock(self.sc, X), convertToMatrixBlock(self.sc, y)) + if self.setOutputRawPredictionsToFalse: + self.model.setOutputRawPredictions(False) + return self + else: + raise Exception('Unsupported input type') + + def transform(self, X): + return self.predict(X) + + # Returns either a DataFrame or MatrixBlock after calling transform(X:MatrixBlock, y:MatrixBlock) on Model object on JVM + def predict(self, X): + """ + Invokes the transform method on Estimator object on JVM if X and y are on of the supported data types + + Parameters + ---------- + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame + """ + if isinstance(X, SUPPORTED_TYPES): + if self.transferUsingDF: + pdfX = convertToPandasDF(X) + df = assemble(self.sqlCtx, pdfX, pdfX.columns, self.featuresCol).select(self.featuresCol) + retjDF = self.model.transform(df._jdf) + retDF = DataFrame(retjDF, self.sqlCtx) + retPDF = retDF.sort('ID').select('prediction').toPandas() + if isinstance(X, np.ndarray): + return retPDF.as_matrix().flatten() + else: + return retPDF + else: + retNumPy = convertToNumpyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X))) + if isinstance(X, np.ndarray): + return retNumPy + else: + return retNumPy # TODO: Convert to Pandas + elif hasattr(X, '_jdf'): + if self.featuresCol in X.columns: + # No need to assemble as input DF is likely coming via MLPipeline + df = X + else: + assembler = VectorAssembler(inputCols=X.columns, outputCol=self.featuresCol) + df = assembler.transform(X) + retjDF = self.model.transform(df._jdf) + retDF = DataFrame(retjDF, self.sqlCtx) + # Return DF + return retDF.sort('ID') + else: + raise Exception('Unsupported input type') + +class BaseSystemMLClassifier(BaseSystemMLEstimator): + + def score(self, X, y): + """ + Scores the predicted value with ground truth 'y' + + Parameters + ---------- + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix + y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix + """ + return sk.metrics.accuracy_score(y, self.predict(X)) + +class BaseSystemMLRegressor(BaseSystemMLEstimator): + + def score(self, X, y): + """ + Scores the predicted value with ground truth 'y' + + Parameters + ---------- + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix + y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix + """ + return sk.metrics.r2_score(y, self.predict(X), multioutput='variance_weighted') + + +class LogisticRegression(BaseSystemMLClassifier): + def __init__(self, sqlCtx, penalty='l2', fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False): + """ + Performs both binomial and multinomial logistic regression. + + Parameters + ---------- + sqlCtx: PySpark SQLContext + penalty: Only 'l2' supported + fit_intercept: Specifies whether to add intercept or not (default: True) + max_iter: Maximum number of outer (Fisher scoring) iterations (default: 100) + max_inner_iter: Maximum number of inner (conjugate gradient) iterations, or 0 if no maximum limit provided (default: 0) + tol: Tolerance used in the convergence criterion (default: 0.000001) + C: 1/regularization parameter (default: 1.0) + solver: Only 'newton-cg' solver supported + """ + self.sqlCtx = sqlCtx + self.sc = sqlCtx._sc + self.uid = "logReg" + self.estimator = self.sc._jvm.org.apache.sysml.api.ml.LogisticRegression(self.uid, self.sc._jsc.sc()) + self.estimator.setMaxOuterIter(max_iter) + self.estimator.setMaxInnerIter(max_inner_iter) + if C <= 0: + raise Exception('C has to be positive') + reg = 1.0 / C + self.estimator.setRegParam(reg) + self.estimator.setTol(tol) + self.estimator.setIcpt(int(fit_intercept)) + self.transferUsingDF = transferUsingDF + self.setOutputRawPredictionsToFalse = True + if penalty != 'l2': + raise Exception('Only l2 penalty is supported') + if solver != 'newton-cg': + raise Exception('Only newton-cg solver supported') + +class LinearRegression(BaseSystemMLRegressor): + + def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False): + """ + Performs linear regression to model the relationship between one numerical response variable and one or more explanatory (feature) variables.. + + Parameters + ---------- + sqlCtx: PySpark SQLContext + fit_intercept: Specifies whether to add intercept or not (default: True) + max_iter: Maximum number of conjugate gradient iterations, or 0 if no maximum limit provided (default: 100) + tol: Tolerance used in the convergence criterion (default: 0.000001) + C: 1/regularization parameter (default: 1.0) + solver: Supports either 'newton-cg' or 'direct-solve' (default: 'newton-cg'). + Depending on the size and the sparsity of the feature matrix, one or the other solver may be more efficient. + 'direct-solve' solver is more efficient when the number of features is relatively small (m < 1000) and + input matrix X is either tall or fairly dense; otherwise 'newton-cg' solver is more efficient. + """ + self.sqlCtx = sqlCtx + self.sc = sqlCtx._sc + self.uid = "lr" + if solver == 'newton-cg' or solver == 'direct-solve': + self.estimator = self.sc._jvm.org.apache.sysml.api.ml.LinearRegression(self.uid, self.sc._jsc.sc(), solver) + else: + raise Exception('Only newton-cg solver supported') + self.estimator.setMaxIter(max_iter) + if C <= 0: + raise Exception('C has to be positive') + reg = 1.0 / C + self.estimator.setRegParam(reg) + self.estimator.setTol(tol) + self.estimator.setIcpt(int(fit_intercept)) + self.transferUsingDF = transferUsingDF + self.setOutputRawPredictionsToFalse = False + + +class SVM(BaseSystemMLClassifier): + + def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False, transferUsingDF=False): + """ + Performs both binary-class and multiclass SVM (Support Vector Machines). + + Parameters + ---------- + sqlCtx: PySpark SQLContext + fit_intercept: Specifies whether to add intercept or not (default: True) + max_iter: Maximum number iterations (default: 100) + tol: Tolerance used in the convergence criterion (default: 0.000001) + C: 1/regularization parameter (default: 1.0) + is_multi_class: Specifies whether to use binary-class SVM or multi-class SVM algorithm (default: False) + """ + self.sqlCtx = sqlCtx + self.sc = sqlCtx._sc + self.uid = "svm" + self.estimator = self.sc._jvm.org.apache.sysml.api.ml.SVM(self.uid, self.sc._jsc.sc(), is_multi_class) + self.estimator.setMaxIter(max_iter) + if C <= 0: + raise Exception('C has to be positive') + reg = 1.0 / C + self.estimator.setRegParam(reg) + self.estimator.setTol(tol) + self.estimator.setIcpt(int(fit_intercept)) + self.transferUsingDF = transferUsingDF + self.setOutputRawPredictionsToFalse = False + +class NaiveBayes(BaseSystemMLClassifier): + + def __init__(self, sqlCtx, laplace=1.0, transferUsingDF=False): + """ + Performs both binary-class and multiclass SVM (Support Vector Machines). + + Parameters + ---------- + sqlCtx: PySpark SQLContext + laplace: Laplace smoothing specified by the user to avoid creation of 0 probabilities (default: 1.0) + """ + self.sqlCtx = sqlCtx + self.sc = sqlCtx._sc + self.uid = "nb" + self.estimator = self.sc._jvm.org.apache.sysml.api.ml.NaiveBayes(self.uid, self.sc._jsc.sc()) + self.estimator.setLaplace(laplace) + self.transferUsingDF = transferUsingDF + self.setOutputRawPredictionsToFalse = False + +__all__ = ['LogisticRegression', 'LinearRegression', 'SVM', 'NaiveBayes'] http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/SystemMLtests.py ---------------------------------------------------------------------- diff --git a/src/main/python/SystemMLtests.py b/src/main/python/SystemMLtests.py deleted file mode 100644 index e11a694..0000000 --- a/src/main/python/SystemMLtests.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/python -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- -import unittest - -from pyspark.context import SparkContext - -from SystemML import MLContext, dml, pydml - -sc = SparkContext() -ml = MLContext(sc) - -class TestAPI(unittest.TestCase): - - def test_output_string(self): - script = dml("x1 = 'Hello World'").out("x1") - self.assertEqual(ml.execute(script).get("x1"), "Hello World") - - def test_output_list(self): - script = """ - x1 = 0.2 - x2 = x1 + 1 - x3 = x1 + 2 - """ - script = dml(script).out("x1", "x2", "x3") - self.assertEqual(ml.execute(script).get("x1", "x2"), [0.2, 1.2]) - self.assertEqual(ml.execute(script).get("x1", "x3"), [0.2, 2.2]) - - def test_output_matrix(self): - sums = """ - s1 = sum(m1) - m2 = m1 * 2 - """ - rdd1 = sc.parallelize(["1.0,2.0", "3.0,4.0"]) - script = dml(sums).input(m1=rdd1).out("s1", "m2") - s1, m2 = ml.execute(script).get("s1", "m2") - self.assertEqual((s1, repr(m2)), (10.0, "Matrix")) - - def test_matrix_toDF(self): - sums = """ - s1 = sum(m1) - m2 = m1 * 2 - """ - rdd1 = sc.parallelize(["1.0,2.0", "3.0,4.0"]) - script = dml(sums).input(m1=rdd1).out("m2") - m2 = ml.execute(script).get("m2") - self.assertEqual(repr(m2.toDF()), "DataFrame[ID: double, C1: double, C2: double]") - - def test_input_single(self): - script = """ - x2 = x1 + 1 - x3 = x1 + 2 - """ - script = dml(script).input("x1", 5).out("x2", "x3") - self.assertEqual(ml.execute(script).get("x2", "x3"), [6, 7]) - - def test_input(self): - script = """ - x3 = x1 + x2 - """ - script = dml(script).input(x1=5, x2=3).out("x3") - self.assertEqual(ml.execute(script).get("x3"), 8) - - def test_rdd(self): - sums = """ - s1 = sum(m1) - s2 = sum(m2) - s3 = 'whatever' - """ - rdd1 = sc.parallelize(["1.0,2.0", "3.0,4.0"]) - rdd2 = sc.parallelize(["5.0,6.0", "7.0,8.0"]) - script = dml(sums).input(m1=rdd1).input(m2=rdd2).out("s1", "s2", "s3") - self.assertEqual( - ml.execute(script).get("s1", "s2", "s3"), [10.0, 26.0, "whatever"]) - - def test_pydml(self): - script = "A = full('1 2 3 4 5 6 7 8 9', rows=3, cols=3)\nx = toString(A)" - script = pydml(script).out("x") - self.assertEqual( - ml.execute(script).get("x"), - '1.000 2.000 3.000\n4.000 5.000 6.000\n7.000 8.000 9.000\n' - ) - - -if __name__ == "__main__": - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/setup.py ---------------------------------------------------------------------- diff --git a/src/main/python/setup.py b/src/main/python/setup.py new file mode 100644 index 0000000..0bcebab --- /dev/null +++ b/src/main/python/setup.py @@ -0,0 +1,77 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from setuptools import setup, find_packages +import os +import time + +VERSION = '0.11.0.dev1' +RELEASED_DATE = str(time.strftime("%m/%d/%Y")) +numpy_version = '1.8.2' +scipy_version = '0.15.1' +REQUIRED_PACKAGES = [ + 'numpy >= %s' % numpy_version, + 'scipy >= %s' % scipy_version +] + +PACKAGE_DATA = [] +for path, subdirs, files in os.walk('SystemML/SystemML-java'): + for name in files: + PACKAGE_DATA = PACKAGE_DATA + [ os.path.join(path, name).replace('./', '') ] + +setup( + name='SystemML', + version=VERSION, + description='Apache SystemML is a distributed and declarative machine learning platform.', + long_description=''' + + Apache SystemML is an effort undergoing incubation at the Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC. + While incubation status is not necessarily a reflection of the completeness + or stability of the code, it does indicate that the project has yet to be + fully endorsed by the ASF. + + Apache SystemML provides declarative large-scale machine learning (ML) that aims at + flexible specification of ML algorithms and automatic generation of hybrid runtime + plans ranging from single-node, in-memory computations, to distributed computations on Apache Hadoop and Apache Spark. + + Note: This is not a released version and was built with SNAPSHOT available on the date''' + RELEASED_DATE, + url='http://systemml.apache.org/', + author='Apache SystemML', + author_email='d...@systemml.incubator.apache.org', + packages=find_packages(), + install_requires=REQUIRED_PACKAGES, + include_package_data=True, + package_data={ + 'SystemML-java': PACKAGE_DATA + }, + classifiers=[ + 'Intended Audience :: Developers', + 'Intended Audience :: Education', + 'Intended Audience :: Science/Research', + 'License :: OSI Approved :: Apache Software License', + 'Programming Language :: Python :: 2.7', + 'Topic :: Scientific/Engineering :: Mathematics', + 'Topic :: Software Development :: Libraries :: Python Modules', + 'Topic :: Software Development :: Libraries', + ], + license='Apache 2.0', + ) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/tests/test_mlcontext.py ---------------------------------------------------------------------- diff --git a/src/main/python/tests/test_mlcontext.py b/src/main/python/tests/test_mlcontext.py new file mode 100644 index 0000000..ec5a196 --- /dev/null +++ b/src/main/python/tests/test_mlcontext.py @@ -0,0 +1,104 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- +import unittest + +from pyspark.context import SparkContext + +from SystemML import MLContext, dml, pydml + +sc = SparkContext() +ml = MLContext(sc) + +class TestAPI(unittest.TestCase): + + def test_output_string(self): + script = dml("x1 = 'Hello World'").out("x1") + self.assertEqual(ml.execute(script).get("x1"), "Hello World") + + def test_output_list(self): + script = """ + x1 = 0.2 + x2 = x1 + 1 + x3 = x1 + 2 + """ + script = dml(script).out("x1", "x2", "x3") + self.assertEqual(ml.execute(script).get("x1", "x2"), [0.2, 1.2]) + self.assertEqual(ml.execute(script).get("x1", "x3"), [0.2, 2.2]) + + def test_output_matrix(self): + sums = """ + s1 = sum(m1) + m2 = m1 * 2 + """ + rdd1 = sc.parallelize(["1.0,2.0", "3.0,4.0"]) + script = dml(sums).input(m1=rdd1).out("s1", "m2") + s1, m2 = ml.execute(script).get("s1", "m2") + self.assertEqual((s1, repr(m2)), (10.0, "Matrix")) + + def test_matrix_toDF(self): + sums = """ + s1 = sum(m1) + m2 = m1 * 2 + """ + rdd1 = sc.parallelize(["1.0,2.0", "3.0,4.0"]) + script = dml(sums).input(m1=rdd1).out("m2") + m2 = ml.execute(script).get("m2") + self.assertEqual(repr(m2.toDF()), "DataFrame[ID: double, C1: double, C2: double]") + + def test_input_single(self): + script = """ + x2 = x1 + 1 + x3 = x1 + 2 + """ + script = dml(script).input("x1", 5).out("x2", "x3") + self.assertEqual(ml.execute(script).get("x2", "x3"), [6, 7]) + + def test_input(self): + script = """ + x3 = x1 + x2 + """ + script = dml(script).input(x1=5, x2=3).out("x3") + self.assertEqual(ml.execute(script).get("x3"), 8) + + def test_rdd(self): + sums = """ + s1 = sum(m1) + s2 = sum(m2) + s3 = 'whatever' + """ + rdd1 = sc.parallelize(["1.0,2.0", "3.0,4.0"]) + rdd2 = sc.parallelize(["5.0,6.0", "7.0,8.0"]) + script = dml(sums).input(m1=rdd1).input(m2=rdd2).out("s1", "s2", "s3") + self.assertEqual( + ml.execute(script).get("s1", "s2", "s3"), [10.0, 26.0, "whatever"]) + + def test_pydml(self): + script = "A = full('1 2 3 4 5 6 7 8 9', rows=3, cols=3)\nx = toString(A)" + script = pydml(script).out("x") + self.assertEqual( + ml.execute(script).get("x"), + '1.000 2.000 3.000\n4.000 5.000 6.000\n7.000 8.000 9.000\n' + ) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/tests/test_mllearn.py ---------------------------------------------------------------------- diff --git a/src/main/python/tests/test_mllearn.py b/src/main/python/tests/test_mllearn.py new file mode 100644 index 0000000..22f798f --- /dev/null +++ b/src/main/python/tests/test_mllearn.py @@ -0,0 +1,178 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- +from sklearn import datasets, neighbors +from SystemML.mllearn import LogisticRegression, LinearRegression, SVM, NaiveBayes +from pyspark.sql import SQLContext +from pyspark.context import SparkContext +import unittest +from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.ml import Pipeline +from pyspark.ml.feature import HashingTF, Tokenizer +import numpy as np +from sklearn.datasets import fetch_20newsgroups +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn import metrics + +sc = SparkContext() +sqlCtx = SQLContext(sc) + +# Currently not integrated with JUnit test +# ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] --driver-class-path SystemML.jar test.py +class TestMLLearn(unittest.TestCase): + def testLogisticSK1(self): + digits = datasets.load_digits() + X_digits = digits.data + y_digits = digits.target + n_samples = len(X_digits) + X_train = X_digits[:.9 * n_samples] + y_train = y_digits[:.9 * n_samples] + X_test = X_digits[.9 * n_samples:] + y_test = y_digits[.9 * n_samples:] + logistic = LogisticRegression(sqlCtx) + score = logistic.fit(X_train, y_train).score(X_test, y_test) + self.failUnless(score > 0.9) + + def testLogisticSK2(self): + digits = datasets.load_digits() + X_digits = digits.data + y_digits = digits.target + n_samples = len(X_digits) + X_train = X_digits[:.9 * n_samples] + y_train = y_digits[:.9 * n_samples] + X_test = X_digits[.9 * n_samples:] + y_test = y_digits[.9 * n_samples:] + # Convert to DataFrame for i/o: current way to transfer data + logistic = LogisticRegression(sqlCtx, transferUsingDF=True) + score = logistic.fit(X_train, y_train).score(X_test, y_test) + self.failUnless(score > 0.9) + + def testLogisticMLPipeline1(self): + training = sqlCtx.createDataFrame([ + (0L, "a b c d e spark", 1.0), + (1L, "b d", 2.0), + (2L, "spark f g h", 1.0), + (3L, "hadoop mapreduce", 2.0), + (4L, "b spark who", 1.0), + (5L, "g d a y", 2.0), + (6L, "spark fly", 1.0), + (7L, "was mapreduce", 2.0), + (8L, "e spark program", 1.0), + (9L, "a e c l", 2.0), + (10L, "spark compile", 1.0), + (11L, "hadoop software", 2.0) + ], ["id", "text", "label"]) + tokenizer = Tokenizer(inputCol="text", outputCol="words") + hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20) + lr = LogisticRegression(sqlCtx) + pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) + model = pipeline.fit(training) + test = sqlCtx.createDataFrame([ + (12L, "spark i j k", 1.0), + (13L, "l m n", 2.0), + (14L, "mapreduce spark", 1.0), + (15L, "apache hadoop", 2.0)], ["id", "text", "label"]) + result = model.transform(test) + predictionAndLabels = result.select("prediction", "label") + evaluator = MulticlassClassificationEvaluator() + score = evaluator.evaluate(predictionAndLabels) + self.failUnless(score == 1.0) + + def testLinearRegressionSK1(self): + diabetes = datasets.load_diabetes() + diabetes_X = diabetes.data[:, np.newaxis, 2] + diabetes_X_train = diabetes_X[:-20] + diabetes_X_test = diabetes_X[-20:] + diabetes_y_train = diabetes.target[:-20] + diabetes_y_test = diabetes.target[-20:] + regr = LinearRegression(sqlCtx) + regr.fit(diabetes_X_train, diabetes_y_train) + score = regr.score(diabetes_X_test, diabetes_y_test) + self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am using it incorrectly) + + def testLinearRegressionSK2(self): + diabetes = datasets.load_diabetes() + diabetes_X = diabetes.data[:, np.newaxis, 2] + diabetes_X_train = diabetes_X[:-20] + diabetes_X_test = diabetes_X[-20:] + diabetes_y_train = diabetes.target[:-20] + diabetes_y_test = diabetes.target[-20:] + regr = LinearRegression(sqlCtx, transferUsingDF=True) + regr.fit(diabetes_X_train, diabetes_y_train) + score = regr.score(diabetes_X_test, diabetes_y_test) + self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am using it incorrectly) + + def testSVMSK1(self): + digits = datasets.load_digits() + X_digits = digits.data + y_digits = digits.target + n_samples = len(X_digits) + X_train = X_digits[:.9 * n_samples] + y_train = y_digits[:.9 * n_samples] + X_test = X_digits[.9 * n_samples:] + y_test = y_digits[.9 * n_samples:] + svm = SVM(sqlCtx, is_multi_class=True) + score = svm.fit(X_train, y_train).score(X_test, y_test) + self.failUnless(score > 0.9) + + def testSVMSK2(self): + digits = datasets.load_digits() + X_digits = digits.data + y_digits = digits.target + n_samples = len(X_digits) + X_train = X_digits[:.9 * n_samples] + y_train = y_digits[:.9 * n_samples] + X_test = X_digits[.9 * n_samples:] + y_test = y_digits[.9 * n_samples:] + svm = SVM(sqlCtx, is_multi_class=True, transferUsingDF=True) + score = svm.fit(X_train, y_train).score(X_test, y_test) + self.failUnless(score > 0.9) + + def testNaiveBayesSK1(self): + digits = datasets.load_digits() + X_digits = digits.data + y_digits = digits.target + n_samples = len(X_digits) + X_train = X_digits[:.9 * n_samples] + y_train = y_digits[:.9 * n_samples] + X_test = X_digits[.9 * n_samples:] + y_test = y_digits[.9 * n_samples:] + nb = NaiveBayes(sqlCtx) + score = nb.fit(X_train, y_train).score(X_test, y_test) + self.failUnless(score > 0.85) + + def testNaiveBayesSK2(self): + categories = ['alt.atheism', 'talk.religion.misc', 'comp.graphics', 'sci.space'] + newsgroups_train = fetch_20newsgroups(subset='train', categories=categories) + newsgroups_test = fetch_20newsgroups(subset='test', categories=categories) + vectorizer = TfidfVectorizer() + # Both vectors and vectors_test are SciPy CSR matrix + vectors = vectorizer.fit_transform(newsgroups_train.data) + vectors_test = vectorizer.transform(newsgroups_test.data) + nb = NaiveBayes(sqlCtx) + nb.fit(vectors, newsgroups_train.target) + pred = nb.predict(vectors_test) + score = metrics.f1_score(newsgroups_test.target, pred, average='weighted') + self.failUnless(score > 0.8) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/uploadToPyPI.sh ---------------------------------------------------------------------- diff --git a/src/main/python/uploadToPyPI.sh b/src/main/python/uploadToPyPI.sh new file mode 100644 index 0000000..c892f3d --- /dev/null +++ b/src/main/python/uploadToPyPI.sh @@ -0,0 +1,34 @@ +#!/bin/bash +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +cd ../../.. +mvn clean package -P distribution +tar -xzf target/systemml-*-SNAPSHOT.tar.gz -C src/main/python/SystemML + +cd src/main/python/SystemML +mv systemml-*-incubating-SNAPSHOT SystemML-java + +cd .. +echo "Preparing to upload to PyPI ...." +python setup.py register sdist upload + +rm -r SystemML/SystemML-java \ No newline at end of file