Repository: mahout Updated Branches: refs/heads/master 1f5e36f24 -> 292b718a6
MAHOUT-1888: Performance Bug with Mahout Vector Serialization, this closes apache/mahout#260 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/292b718a Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/292b718a Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/292b718a Branch: refs/heads/master Commit: 292b718a633efae5ff1b47772b9dd7bf9f1ca6da Parents: 1f5e36f Author: smarthi <smar...@apache.org> Authored: Thu Oct 13 22:37:34 2016 -0400 Committer: smarthi <smar...@apache.org> Committed: Thu Oct 13 22:38:09 2016 -0400 ---------------------------------------------------------------------- .travis.yml | 2 +- .../mahout/math/scalabindings/MathSuite.scala | 2 +- .../scalabindings/RLikeVectorOpsSuite.scala | 10 +-- .../mahout/math/TransposedMatrixView.java | 2 +- pom.xml | 4 +- .../apache/mahout/common/HDFSPathSearch.scala | 6 +- .../io/MahoutKryoRegistrator.scala | 29 +++++++ .../mahout/sparkbindings/blas/BlasSuite.scala | 82 ++++++++++++++++---- 8 files changed, 109 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/292b718a/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index a0289e4..426d57e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ branches: matrix: include: - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dflink.version=1.0.1 -Dspark.version=1.5.2 -Ptravis" + env: PROFILE="-Dhadoop.version=2.4.1 -Dflink.version=1.1.3 -Dspark.version=1.6.2 -Ptravis" git: depth: 10 http://git-wip-us.apache.org/repos/asf/mahout/blob/292b718a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala index ee80841..9e93e63 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala @@ -102,7 +102,7 @@ class MathSuite extends FunSuite with MahoutSuite { printf("cholArg=\n%s\n", cholArg) - printf("V'DV=\n%s\n", (vblock.t %*% d %*% vblock)) + printf("V'DV=\n%s\n", vblock.t %*% d %*% vblock) printf("V'V+V'DV=\n%s\n", vtv + (vblock.t %*% d %*% vblock)) http://git-wip-us.apache.org/repos/asf/mahout/blob/292b718a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/RLikeVectorOpsSuite.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/RLikeVectorOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/RLikeVectorOpsSuite.scala index 72754f8..f17f08a 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/RLikeVectorOpsSuite.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/RLikeVectorOpsSuite.scala @@ -17,14 +17,12 @@ package org.apache.mahout.math.scalabindings -import org.apache.log4j.{Level, BasicConfigurator} -import org.scalatest.FunSuite +import org.apache.log4j.{BasicConfigurator, Level} +import org.apache.mahout.logging._ import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ +import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.test.MahoutSuite - -import org.apache.mahout.logging._ +import org.scalatest.FunSuite class RLikeVectorOpsSuite extends FunSuite with MahoutSuite { http://git-wip-us.apache.org/repos/asf/mahout/blob/292b718a/math/src/main/java/org/apache/mahout/math/TransposedMatrixView.java ---------------------------------------------------------------------- diff --git a/math/src/main/java/org/apache/mahout/math/TransposedMatrixView.java b/math/src/main/java/org/apache/mahout/math/TransposedMatrixView.java index c67cb47..ede6f35 100644 --- a/math/src/main/java/org/apache/mahout/math/TransposedMatrixView.java +++ b/math/src/main/java/org/apache/mahout/math/TransposedMatrixView.java @@ -26,7 +26,7 @@ import org.apache.mahout.math.function.DoubleFunction; /** * Matrix View backed by an {@link org.apache.mahout.math.function.IntIntFunction} */ -class TransposedMatrixView extends AbstractMatrix { +public class TransposedMatrixView extends AbstractMatrix { private Matrix m; http://git-wip-us.apache.org/repos/asf/mahout/blob/292b718a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9af14ac..44852f4 100644 --- a/pom.xml +++ b/pom.xml @@ -121,8 +121,8 @@ <slf4j.version>1.7.21</slf4j.version> <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> - <spark.version>1.5.2</spark.version> - <flink.version>1.1.2</flink.version> + <spark.version>1.6.2</spark.version> + <flink.version>1.1.3</flink.version> <h2o.version>0.1.25</h2o.version> <jackson.version>2.7.4</jackson.version> </properties> http://git-wip-us.apache.org/repos/asf/mahout/blob/292b718a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala index 0b4130d..e7a5809 100644 --- a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala +++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala @@ -60,17 +60,17 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: val seed = fs.getFileStatus(new Path(dir)) var f: String = files - if (seed.isDir) { + if (seed.isDirectory) { val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir)) for (fileStatus <- fileStatuses) { if (fileStatus.getPath().getName().matches(filePattern) - && !fileStatus.isDir) { + && !fileStatus.isDirectory) { // found a file if (fileStatus.getLen() != 0) { // file is not empty f = f + fileStatus.getPath.toUri.toString + "," } - } else if (fileStatus.isDir && recursive) { + } else if (fileStatus.isDirectory && recursive) { f = findFiles(fileStatus.getPath.toString, filePattern, f) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/292b718a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala index 4e0e061..2afea8a 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala @@ -35,6 +35,35 @@ object MahoutKryoRegistrator { kryo.addDefaultSerializer(classOf[Vector], new VectorKryoSerializer()) kryo.addDefaultSerializer(classOf[Matrix], new GenericMatrixKryoSerializer) + Seq( + classOf[Vector], + classOf[Matrix], + classOf[DiagonalMatrix], + classOf[DenseMatrix], + classOf[SparseRowMatrix], + classOf[SparseMatrix], + classOf[MatrixView], + classOf[MatrixSlice], + classOf[TransposedMatrixView], + classOf[DenseVector], + classOf[RandomAccessSparseVector], + classOf[SequentialAccessSparseVector], + classOf[MatrixVectorView], + classOf[VectorView], + classOf[PermutedVectorView], + classOf[Array[Vector]], + classOf[Array[Matrix]], + Class.forName(classOf[DiagonalMatrix].getName + "$SingleElementVector"), + Class.forName(classOf[DenseVector].getName + "$DenseVectorView"), + // This is supported by twitter.chill, but kryo still is offended by lack of registration: + classOf[Range], + classOf[Unit], + classOf[scala.collection.mutable.WrappedArray.ofRef[_]], + classOf[Array[Int]], + classOf[Array[String]] + + ) foreach kryo.register + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/292b718a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala index 8c8ac3f..165e145 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala @@ -17,29 +17,36 @@ package org.apache.mahout.sparkbindings.blas -import collection._ -import JavaConversions._ -import org.scalatest.FunSuite -import org.apache.mahout.test.DistributedMahoutSuite +import java.io.ByteArrayOutputStream + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.Output +import com.twitter.chill.AllScalaRegistrar +import org.apache.log4j.Level +import org.apache.mahout.logging._ import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import drm._ +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.logical.{OpABt, OpAewB, OpAt, OpAtA} +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ import org.apache.mahout.sparkbindings._ import org.apache.mahout.sparkbindings.drm._ -import org.apache.mahout.math.drm.logical.{OpAt, OpAtA, OpAewB, OpABt} +import org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator import org.apache.mahout.sparkbindings.test.DistributedSparkSuite +import org.scalatest.FunSuite /** Collection of physical blas operator tests. */ class BlasSuite extends FunSuite with DistributedSparkSuite { + private final implicit val mahoutLog = getLog(classOf[RLikeDrmOpsSuite]) + test("ABt") { val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) val inCoreB = dense((3, 4, 5), (5, 6, 7)) val drmA = drmParallelize(m = inCoreA, numPartitions = 3) val drmB = drmParallelize(m = inCoreB, numPartitions = 2) - val op = new OpABt(drmA, drmB) + val op = OpABt(drmA, drmB) val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) @@ -59,7 +66,7 @@ class BlasSuite extends FunSuite with DistributedSparkSuite { val drmA = drmParallelize(m = inCoreA, numPartitions = 2) val drmB = drmParallelize(m = inCoreB) - val op = new OpAewB(drmA, drmB, "*") + val op = OpAewB(drmA, drmB, "*") val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) @@ -76,7 +83,7 @@ class BlasSuite extends FunSuite with DistributedSparkSuite { val drmA = drmParallelize(m = inCoreA, numPartitions = 2) val drmB = drmParallelize(m = inCoreB) - val op = new OpAewB(drmA, drmB, "+") + val op = OpAewB(drmA, drmB, "+") val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) @@ -93,7 +100,7 @@ class BlasSuite extends FunSuite with DistributedSparkSuite { val drmA = drmParallelize(m = inCoreA, numPartitions = 2) val drmB = drmParallelize(m = inCoreB) - val op = new OpAewB(drmA, drmB, "-") + val op = OpAewB(drmA, drmB, "-") val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) @@ -110,7 +117,7 @@ class BlasSuite extends FunSuite with DistributedSparkSuite { val drmA = drmParallelize(m = inCoreA, numPartitions = 2) val drmB = drmParallelize(m = inCoreB) - val op = new OpAewB(drmA, drmB, "/") + val op = OpAewB(drmA, drmB, "/") val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol) @@ -141,7 +148,7 @@ class BlasSuite extends FunSuite with DistributedSparkSuite { val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) val drmA = drmParallelize(m = inCoreA, numPartitions = 2) - val op = new OpAt(drmA) + val op = OpAt(drmA) val drmAt = new CheckpointedDrmSpark(rddInput = At.at(op, srcA = drmA), _nrow = op.nrow, _ncol = op.ncol) val inCoreAt = drmAt.collect val inCoreControlAt = inCoreA.t @@ -151,4 +158,51 @@ class BlasSuite extends FunSuite with DistributedSparkSuite { } + test("verbosity") { + def testreg(o: Any*): Unit = { + val s = new String(kryoSet(o: _*)) + s.contains("org.apache.mahout") shouldBe false + } + + def kryoSet[T](obj: T*) = { + + val kryo = new Kryo() + new AllScalaRegistrar()(kryo) + + MahoutKryoRegistrator.registerClasses(kryo) + + val baos = new ByteArrayOutputStream() + val output = new Output(baos) + obj.foreach(kryo.writeClassAndObject(output, _)) + output.close + + baos.toByteArray + } + + mahoutLog.setLevel(Level.TRACE) + + val mxA = dense((1, 2), (3, 4)) + val mxB = new SparseRowMatrix(4,5) + val mxC = new SparseMatrix(4,5) + val mxD = diagv(dvec(1, 2, 3, 5)) + val mxE = mxA (0 to 0, 0 to 0) + val mxF = mxA.t + + + testreg( + mxD, mxD(0, ::), mxD(::, 0), mxD.diagv, + mxA, mxA(0, ::), mxA(::, 0), mxA.diagv, + mxB, mxB(0, ::), mxB(::, 0), mxB.diagv, + mxC, mxC(0, ::), mxC(::, 0), mxC.diagv, + mxE, mxE(0, ::), mxE(::, 0), mxE.diagv, + mxF, mxF(0, ::), mxF(::, 0), mxF.diagv, + mxA(0,::)(0 to 0), mxE(0,::)(0 to 0), + new DenseVector(6), new DenseVector(6) (0 to 0), + new RandomAccessSparseVector(6), new RandomAccessSparseVector(6)(0 to 0), + new SequentialAccessSparseVector(6), new SequentialAccessSparseVector(6)(0 to 0) + + ) + + } + }