spark git commit: [SPARK-8730] Fixes - Deser objects containing a primitive class attribute

2015-08-31 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master f0f563a3c -> 72f6dbf7b


[SPARK-8730] Fixes - Deser objects containing a primitive class attribute

Author: EugenCepoi 

Closes #7122 from EugenCepoi/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72f6dbf7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72f6dbf7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72f6dbf7

Branch: refs/heads/master
Commit: 72f6dbf7b0c8b271f5f9c762374422c69c8ab43d
Parents: f0f563a
Author: EugenCepoi 
Authored: Mon Aug 31 13:24:35 2015 -0500
Committer: Imran Rashid 
Committed: Mon Aug 31 13:24:35 2015 -0500

--
 .../spark/serializer/JavaSerializer.scala   | 27 
 .../spark/serializer/JavaSerializerSuite.scala  | 18 +
 2 files changed, 40 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72f6dbf7/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 4a5274b..b463a71 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -62,17 +62,34 @@ private[spark] class JavaDeserializationStream(in: 
InputStream, loader: ClassLoa
   extends DeserializationStream {
 
   private val objIn = new ObjectInputStream(in) {
-override def resolveClass(desc: ObjectStreamClass): Class[_] = {
-  // scalastyle:off classforname
-  Class.forName(desc.getName, false, loader)
-  // scalastyle:on classforname
-}
+override def resolveClass(desc: ObjectStreamClass): Class[_] =
+  try {
+// scalastyle:off classforname
+Class.forName(desc.getName, false, loader)
+// scalastyle:on classforname
+  } catch {
+case e: ClassNotFoundException =>
+  
JavaDeserializationStream.primitiveMappings.get(desc.getName).getOrElse(throw e)
+  }
   }
 
   def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T]
   def close() { objIn.close() }
 }
 
+private object JavaDeserializationStream {
+  val primitiveMappings = Map[String, Class[_]](
+"boolean" -> classOf[Boolean],
+"byte" -> classOf[Byte],
+"char" -> classOf[Char],
+"short" -> classOf[Short],
+"int" -> classOf[Int],
+"long" -> classOf[Long],
+"float" -> classOf[Float],
+"double" -> classOf[Double],
+"void" -> classOf[Void]
+  )
+}
 
 private[spark] class JavaSerializerInstance(
 counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: 
ClassLoader)

http://git-wip-us.apache.org/repos/asf/spark/blob/72f6dbf7/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
index 329a2b6..20f4567 100644
--- a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
@@ -25,4 +25,22 @@ class JavaSerializerSuite extends SparkFunSuite {
 val instance = serializer.newInstance()
 instance.deserialize[JavaSerializer](instance.serialize(serializer))
   }
+
+  test("Deserialize object containing a primitive Class as attribute") {
+val serializer = new JavaSerializer(new SparkConf())
+val instance = serializer.newInstance()
+instance.deserialize[JavaSerializer](instance.serialize(new 
ContainsPrimitiveClass()))
+  }
+}
+
+private class ContainsPrimitiveClass extends Serializable {
+  val intClass = classOf[Int]
+  val longClass = classOf[Long]
+  val shortClass = classOf[Short]
+  val charClass = classOf[Char]
+  val doubleClass = classOf[Double]
+  val floatClass = classOf[Float]
+  val booleanClass = classOf[Boolean]
+  val byteClass = classOf[Byte]
+  val voidClass = classOf[Void]
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later

2015-08-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 72f6dbf7b -> 4a5fe0916


[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when 
deregisterReceivering since we may reuse it later

`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it 
will throw `java.util.NoSuchElementException: key not found` when restarting it.

Author: zsxwing 

Closes #8538 from zsxwing/SPARK-10369.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a5fe091
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a5fe091
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a5fe091

Branch: refs/heads/master
Commit: 4a5fe091658b1d06f427e404a11a84fc84f953c5
Parents: 72f6dbf
Author: zsxwing 
Authored: Mon Aug 31 12:19:11 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 31 12:19:11 2015 -0700

--
 .../streaming/scheduler/ReceiverTracker.scala   |  4 +-
 .../scheduler/ReceiverTrackerSuite.scala| 51 
 2 files changed, 53 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4a5fe091/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3d532a6..f86fd44 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 ReceiverTrackingInfo(
   streamId, ReceiverState.INACTIVE, None, None, None, None, 
Some(errorInfo))
 }
-receiverTrackingInfos -= streamId
+receiverTrackingInfos(streamId) = newReceiverTrackingInfo
 
listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
 val messageWithError = if (error != null && !error.isEmpty) {
   s"$message - $error"
@@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 context.reply(true)
   // Local messages
   case AllReceiverIds =>
-context.reply(receiverTrackingInfos.keys.toSeq)
+context.reply(receiverTrackingInfos.filter(_._2.state != 
ReceiverState.INACTIVE).keys.toSeq)
   case StopAllReceivers =>
 assert(isTrackerStopping || isTrackerStopped)
 stopReceivers()

http://git-wip-us.apache.org/repos/asf/spark/blob/4a5fe091/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index dd292ba..45138b7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase {
   }
 }
   }
+
+  test("should restart receiver after stopping it") {
+withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc 
=>
+  @volatile var startTimes = 0
+  ssc.addStreamingListener(new StreamingListener {
+override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
+  startTimes += 1
+}
+  })
+  val input = ssc.receiverStream(new StoppableReceiver)
+  val output = new TestOutputStream(input)
+  output.register()
+  ssc.start()
+  StoppableReceiver.shouldStop = true
+  eventually(timeout(10 seconds), interval(10 millis)) {
+// The receiver is stopped once, so if it's restarted, it should be 
started twice.
+assert(startTimes === 2)
+  }
+}
+  }
 }
 
 /** An input DStream with for testing rate controlling */
@@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver {
 
   def getActive(): Option[RateTestReceiver] = Option(activeReceiver)
 }
+
+/**
+ * A custom receiver that could be stopped via StoppableReceiver.shouldStop
+ */
+class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+  var receivingThreadOption: Option[Thread] = None
+
+  def onStart() {
+val thread = new Thread() {
+  override def run() {
+while (!StoppableReceiver.shouldStop) {
+  Thread.sleep(10)
+}
+StoppableReceiver.this.stop("stop")
+  }
+}
+thread.sta

spark git commit: [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later

2015-08-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 bf5b2f26b -> 33ce274cd


[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when 
deregisterReceivering since we may reuse it later

`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it 
will throw `java.util.NoSuchElementException: key not found` when restarting it.

Author: zsxwing 

Closes #8538 from zsxwing/SPARK-10369.

(cherry picked from commit 4a5fe091658b1d06f427e404a11a84fc84f953c5)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33ce274c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33ce274c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33ce274c

Branch: refs/heads/branch-1.5
Commit: 33ce274cdf7538b5816f1a400b2fad394ec2a056
Parents: bf5b2f2
Author: zsxwing 
Authored: Mon Aug 31 12:19:11 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 31 12:19:48 2015 -0700

--
 .../streaming/scheduler/ReceiverTracker.scala   |  4 +-
 .../scheduler/ReceiverTrackerSuite.scala| 51 
 2 files changed, 53 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3d532a6..f86fd44 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 ReceiverTrackingInfo(
   streamId, ReceiverState.INACTIVE, None, None, None, None, 
Some(errorInfo))
 }
-receiverTrackingInfos -= streamId
+receiverTrackingInfos(streamId) = newReceiverTrackingInfo
 
listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
 val messageWithError = if (error != null && !error.isEmpty) {
   s"$message - $error"
@@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 context.reply(true)
   // Local messages
   case AllReceiverIds =>
-context.reply(receiverTrackingInfos.keys.toSeq)
+context.reply(receiverTrackingInfos.filter(_._2.state != 
ReceiverState.INACTIVE).keys.toSeq)
   case StopAllReceivers =>
 assert(isTrackerStopping || isTrackerStopped)
 stopReceivers()

http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index dd292ba..45138b7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase {
   }
 }
   }
+
+  test("should restart receiver after stopping it") {
+withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc 
=>
+  @volatile var startTimes = 0
+  ssc.addStreamingListener(new StreamingListener {
+override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
+  startTimes += 1
+}
+  })
+  val input = ssc.receiverStream(new StoppableReceiver)
+  val output = new TestOutputStream(input)
+  output.register()
+  ssc.start()
+  StoppableReceiver.shouldStop = true
+  eventually(timeout(10 seconds), interval(10 millis)) {
+// The receiver is stopped once, so if it's restarted, it should be 
started twice.
+assert(startTimes === 2)
+  }
+}
+  }
 }
 
 /** An input DStream with for testing rate controlling */
@@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver {
 
   def getActive(): Option[RateTestReceiver] = Option(activeReceiver)
 }
+
+/**
+ * A custom receiver that could be stopped via StoppableReceiver.shouldStop
+ */
+class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+  var receivingThreadOption: Option[Thread] = None
+
+  def onStart() {
+val thread = new Thread() {
+  override def run() {
+while (!StoppableReceiver.shouldStop) {
+  

spark git commit: [SPARK-10170] [SQL] Add DB2 JDBC dialect support.

2015-08-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4a5fe0916 -> a2d5c7209


[SPARK-10170] [SQL] Add DB2 JDBC dialect support.

Data frame write to DB2 database is failing because by default JDBC data source 
implementation is generating a table schema with DB2 unsupported data types 
TEXT for String, and BIT1(1) for Boolean.

This patch registers DB2 JDBC Dialect that maps String, Boolean to valid DB2 
data types.

Author: sureshthalamati 

Closes #8393 from sureshthalamati/db2_dialect_spark-10170.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2d5c720
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2d5c720
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2d5c720

Branch: refs/heads/master
Commit: a2d5c72091b1c602694dbca823a7b26f86b02864
Parents: 4a5fe09
Author: sureshthalamati 
Authored: Mon Aug 31 12:39:58 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 31 12:39:58 2015 -0700

--
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala  | 18 ++
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala |  7 +++
 2 files changed, 25 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a2d5c720/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 8849fc2..c6d05c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -125,6 +125,7 @@ object JdbcDialects {
 
   registerDialect(MySQLDialect)
   registerDialect(PostgresDialect)
+  registerDialect(DB2Dialect)
 
   /**
* Fetch the JdbcDialect class corresponding to a given database url.
@@ -222,3 +223,20 @@ case object MySQLDialect extends JdbcDialect {
 s"`$colName`"
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * Default DB2 dialect, mapping string/boolean on write to valid DB2 types.
+ * By default string, and boolean gets mapped to db2 invalid types TEXT, and 
BIT(1).
+ */
+@DeveloperApi
+case object DB2Dialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
+case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
+case _ => None
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a2d5c720/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 0edac08..d8c9a08 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -407,6 +407,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter 
with SharedSQLContext
   test("Default jdbc dialect registration") {
 assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect)
 assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == 
PostgresDialect)
+assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect)
 assert(JdbcDialects.get("test.invalid") == NoopDialect)
   }
 
@@ -443,4 +444,10 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter 
with SharedSQLContext
 assert(agg.getCatalystType(0, "", 1, null) === Some(LongType))
 assert(agg.getCatalystType(1, "", 1, null) === Some(StringType))
   }
+
+  test("DB2Dialect type mapping") {
+val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
+
assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == 
"CLOB")
+
assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == 
"CHAR(1)")
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9954] [MLLIB] use first 128 nonzeros to compute Vector.hashCode

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master a2d5c7209 -> 23e39cc7b


[SPARK-9954] [MLLIB] use first 128 nonzeros to compute Vector.hashCode

This could help reduce hash collisions, e.g., in `RDD[Vector].repartition`. 
jkbradley

Author: Xiangrui Meng 

Closes #8182 from mengxr/SPARK-9954.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23e39cc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23e39cc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23e39cc7

Branch: refs/heads/master
Commit: 23e39cc7b1bb7f1087c4706234c9b5165a571357
Parents: a2d5c72
Author: Xiangrui Meng 
Authored: Mon Aug 31 15:49:25 2015 -0700
Committer: Xiangrui Meng 
Committed: Mon Aug 31 15:49:25 2015 -0700

--
 .../org/apache/spark/mllib/linalg/Vectors.scala | 38 +++-
 1 file changed, 21 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/23e39cc7/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 06ebb15..3642e92 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -71,20 +71,22 @@ sealed trait Vector extends Serializable {
   }
 
   /**
-   * Returns a hash code value for the vector. The hash code is based on its 
size and its nonzeros
-   * in the first 16 entries, using a hash algorithm similar to 
[[java.util.Arrays.hashCode]].
+   * Returns a hash code value for the vector. The hash code is based on its 
size and its first 128
+   * nonzero entries, using a hash algorithm similar to 
[[java.util.Arrays.hashCode]].
*/
   override def hashCode(): Int = {
 // This is a reference implementation. It calls return in foreachActive, 
which is slow.
 // Subclasses should override it with optimized implementation.
 var result: Int = 31 + size
+var nnz = 0
 this.foreachActive { (index, value) =>
-  if (index < 16) {
+  if (nnz < Vectors.MAX_HASH_NNZ) {
 // ignore explicit 0 for comparison between sparse and dense
 if (value != 0) {
   result = 31 * result + index
   val bits = java.lang.Double.doubleToLongBits(value)
   result = 31 * result + (bits ^ (bits >>> 32)).toInt
+  nnz += 1
 }
   } else {
 return result
@@ -536,6 +538,9 @@ object Vectors {
 }
 allEqual
   }
+
+  /** Max number of nonzero entries used in computing hash code. */
+  private[linalg] val MAX_HASH_NNZ = 128
 }
 
 /**
@@ -578,13 +583,15 @@ class DenseVector @Since("1.0.0") (
   override def hashCode(): Int = {
 var result: Int = 31 + size
 var i = 0
-val end = math.min(values.length, 16)
-while (i < end) {
+val end = values.length
+var nnz = 0
+while (i < end && nnz < Vectors.MAX_HASH_NNZ) {
   val v = values(i)
   if (v != 0.0) {
 result = 31 * result + i
 val bits = java.lang.Double.doubleToLongBits(values(i))
 result = 31 * result + (bits ^ (bits >>> 32)).toInt
+nnz += 1
   }
   i += 1
 }
@@ -707,19 +714,16 @@ class SparseVector @Since("1.0.0") (
   override def hashCode(): Int = {
 var result: Int = 31 + size
 val end = values.length
-var continue = true
 var k = 0
-while ((k < end) & continue) {
-  val i = indices(k)
-  if (i < 16) {
-val v = values(k)
-if (v != 0.0) {
-  result = 31 * result + i
-  val bits = java.lang.Double.doubleToLongBits(v)
-  result = 31 * result + (bits ^ (bits >>> 32)).toInt
-}
-  } else {
-continue = false
+var nnz = 0
+while (k < end && nnz < Vectors.MAX_HASH_NNZ) {
+  val v = values(k)
+  if (v != 0.0) {
+val i = indices(k)
+result = 31 * result + i
+val bits = java.lang.Double.doubleToLongBits(v)
+result = 31 * result + (bits ^ (bits >>> 32)).toInt
+nnz += 1
   }
   k += 1
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8472] [ML] [PySpark] Python API for DCT

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 23e39cc7b -> 5b3245d6d


[SPARK-8472] [ML] [PySpark] Python API for DCT

Add Python API for ml.feature.DCT.

Author: Yanbo Liang 

Closes #8485 from yanboliang/spark-8472.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b3245d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b3245d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b3245d6

Branch: refs/heads/master
Commit: 5b3245d6dff65972fc39c73f90d5cbdf84d19129
Parents: 23e39cc
Author: Yanbo Liang 
Authored: Mon Aug 31 15:50:41 2015 -0700
Committer: Xiangrui Meng 
Committed: Mon Aug 31 15:50:41 2015 -0700

--
 python/pyspark/ml/feature.py | 65 ++-
 1 file changed, 64 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5b3245d6/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 04b2b2c..59300a6 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -26,7 +26,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel, 
JavaTransformer
 from pyspark.mllib.common import inherit_doc
 from pyspark.mllib.linalg import _convert_to_vector
 
-__all__ = ['Binarizer', 'Bucketizer', 'ElementwiseProduct', 'HashingTF', 
'IDF', 'IDFModel',
+__all__ = ['Binarizer', 'Bucketizer', 'DCT', 'ElementwiseProduct', 
'HashingTF', 'IDF', 'IDFModel',
'NGram', 'Normalizer', 'OneHotEncoder', 'PolynomialExpansion', 
'RegexTokenizer',
'StandardScaler', 'StandardScalerModel', 'StringIndexer', 
'StringIndexerModel',
'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', 
'Word2VecModel',
@@ -167,6 +167,69 @@ class Bucketizer(JavaTransformer, HasInputCol, 
HasOutputCol):
 
 
 @inherit_doc
+class DCT(JavaTransformer, HasInputCol, HasOutputCol):
+"""
+A feature transformer that takes the 1D discrete cosine transform
+of a real vector. No zero padding is performed on the input vector.
+It returns a real vector of the same length representing the DCT.
+The return vector is scaled such that the transform matrix is
+unitary (aka scaled DCT-II).
+
+More information on
+`https://en.wikipedia.org/wiki/Discrete_cosine_transform#DCT-II Wikipedia`.
+
+>>> from pyspark.mllib.linalg import Vectors
+>>> df1 = sqlContext.createDataFrame([(Vectors.dense([5.0, 8.0, 6.0]),)], 
["vec"])
+>>> dct = DCT(inverse=False, inputCol="vec", outputCol="resultVec")
+>>> df2 = dct.transform(df1)
+>>> df2.head().resultVec
+DenseVector([10.969..., -0.707..., -2.041...])
+>>> df3 = DCT(inverse=True, inputCol="resultVec", 
outputCol="origVec").transform(df2)
+>>> df3.head().origVec
+DenseVector([5.0, 8.0, 6.0])
+"""
+
+# a placeholder to make it appear in the generated doc
+inverse = Param(Params._dummy(), "inverse", "Set transformer to perform 
inverse DCT, " +
+"default False.")
+
+@keyword_only
+def __init__(self, inverse=False, inputCol=None, outputCol=None):
+"""
+__init__(self, inverse=False, inputCol=None, outputCol=None)
+"""
+super(DCT, self).__init__()
+self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.DCT", 
self.uid)
+self.inverse = Param(self, "inverse", "Set transformer to perform 
inverse DCT, " +
+ "default False.")
+self._setDefault(inverse=False)
+kwargs = self.__init__._input_kwargs
+self.setParams(**kwargs)
+
+@keyword_only
+def setParams(self, inverse=False, inputCol=None, outputCol=None):
+"""
+setParams(self, inverse=False, inputCol=None, outputCol=None)
+Sets params for this DCT.
+"""
+kwargs = self.setParams._input_kwargs
+return self._set(**kwargs)
+
+def setInverse(self, value):
+"""
+Sets the value of :py:attr:`inverse`.
+"""
+self._paramMap[self.inverse] = value
+return self
+
+def getInverse(self):
+"""
+Gets the value of inverse or its default value.
+"""
+return self.getOrDefault(self.inverse)
+
+
+@inherit_doc
 class ElementwiseProduct(JavaTransformer, HasInputCol, HasOutputCol):
 """
 Outputs the Hadamard product (i.e., the element-wise product) of each 
input vector


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10341] [SQL] fix memory starving in unsafe SMJ

2015-08-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 5b3245d6d -> 540bdee93


[SPARK-10341] [SQL] fix memory starving in unsafe SMJ

In SMJ, the first ExternalSorter could consume all the memory before spilling, 
then the second can not even acquire the first page.

Before we have a better memory allocator, SMJ should call prepare() before call 
any compute() of it's children.

cc rxin JoshRosen

Author: Davies Liu 

Closes #8511 from davies/smj_memory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540bdee9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540bdee9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540bdee9

Branch: refs/heads/master
Commit: 540bdee93103a73736d282b95db6a8cda8f6a2b1
Parents: 5b3245d
Author: Davies Liu 
Authored: Mon Aug 31 15:55:22 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 31 15:55:22 2015 -0700

--
 .../rdd/MapPartitionsWithPreparationRDD.scala   | 21 ++--
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  | 13 
 .../MapPartitionsWithPreparationRDDSuite.scala  | 14 +
 3 files changed, 42 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/540bdee9/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
index b475bd8..1f2213d 100644
--- 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
+++ 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.rdd
 
+import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, Partitioner, TaskContext}
@@ -38,12 +39,28 @@ private[spark] class MapPartitionsWithPreparationRDD[U: 
ClassTag, T: ClassTag, M
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
+  // In certain join operations, prepare can be called on the same partition 
multiple times.
+  // In this case, we need to ensure that each call to compute gets a separate 
prepare argument.
+  private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
+
+  /**
+   * Prepare a partition for a single call to compute.
+   */
+  def prepare(): Unit = {
+preparedArguments += preparePartition()
+  }
+
   /**
* Prepare a partition before computing it from its parent.
*/
   override def compute(partition: Partition, context: TaskContext): 
Iterator[U] = {
-val preparedArgument = preparePartition()
+val prepared =
+  if (preparedArguments.isEmpty) {
+preparePartition()
+  } else {
+preparedArguments.remove(0)
+  }
 val parentIterator = firstParent[T].iterator(partition, context)
-executePartition(context, partition.index, preparedArgument, 
parentIterator)
+executePartition(context, partition.index, prepared, parentIterator)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/540bdee9/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 81f40ad..b3c6439 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -73,6 +73,16 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: 
ClassTag](
 super.clearDependencies()
 rdds = null
   }
+
+  /**
+   * Call the prepare method of every parent that has one.
+   * This is needed for reserving execution memory in advance.
+   */
+  protected def tryPrepareParents(): Unit = {
+rdds.collect {
+  case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare()
+}
+  }
 }
 
 private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: 
ClassTag](
@@ -84,6 +94,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: 
ClassTag, V: ClassTag]
   extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), 
preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+tryPrepareParents()
 val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
 f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), 
context))
   }
@@ -107,6 +118,7 @@ private[spark] class ZippedPartitionsRDD3
   extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), 
preservesPartitioning) {
 
   override de

spark git commit: [SPARK-10341] [SQL] fix memory starving in unsafe SMJ

2015-08-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 33ce274cd -> 1c752b8b5


[SPARK-10341] [SQL] fix memory starving in unsafe SMJ

In SMJ, the first ExternalSorter could consume all the memory before spilling, 
then the second can not even acquire the first page.

Before we have a better memory allocator, SMJ should call prepare() before call 
any compute() of it's children.

cc rxin JoshRosen

Author: Davies Liu 

Closes #8511 from davies/smj_memory.

(cherry picked from commit 540bdee93103a73736d282b95db6a8cda8f6a2b1)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c752b8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c752b8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c752b8b

Branch: refs/heads/branch-1.5
Commit: 1c752b8b5c7090936b5c2ca94e8fb47c4f570d69
Parents: 33ce274
Author: Davies Liu 
Authored: Mon Aug 31 15:55:22 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 31 15:55:29 2015 -0700

--
 .../rdd/MapPartitionsWithPreparationRDD.scala   | 21 ++--
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  | 13 
 .../MapPartitionsWithPreparationRDDSuite.scala  | 14 +
 3 files changed, 42 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1c752b8b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
index b475bd8..1f2213d 100644
--- 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
+++ 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.rdd
 
+import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, Partitioner, TaskContext}
@@ -38,12 +39,28 @@ private[spark] class MapPartitionsWithPreparationRDD[U: 
ClassTag, T: ClassTag, M
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
+  // In certain join operations, prepare can be called on the same partition 
multiple times.
+  // In this case, we need to ensure that each call to compute gets a separate 
prepare argument.
+  private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
+
+  /**
+   * Prepare a partition for a single call to compute.
+   */
+  def prepare(): Unit = {
+preparedArguments += preparePartition()
+  }
+
   /**
* Prepare a partition before computing it from its parent.
*/
   override def compute(partition: Partition, context: TaskContext): 
Iterator[U] = {
-val preparedArgument = preparePartition()
+val prepared =
+  if (preparedArguments.isEmpty) {
+preparePartition()
+  } else {
+preparedArguments.remove(0)
+  }
 val parentIterator = firstParent[T].iterator(partition, context)
-executePartition(context, partition.index, preparedArgument, 
parentIterator)
+executePartition(context, partition.index, prepared, parentIterator)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c752b8b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 81f40ad..b3c6439 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -73,6 +73,16 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: 
ClassTag](
 super.clearDependencies()
 rdds = null
   }
+
+  /**
+   * Call the prepare method of every parent that has one.
+   * This is needed for reserving execution memory in advance.
+   */
+  protected def tryPrepareParents(): Unit = {
+rdds.collect {
+  case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare()
+}
+  }
 }
 
 private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: 
ClassTag](
@@ -84,6 +94,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: 
ClassTag, V: ClassTag]
   extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), 
preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+tryPrepareParents()
 val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
 f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), 
context))
   }
@@ -107,6 +118,7 @@ private[spark] class ZippedPartitionsRDD3
  

[1/2] spark git commit: Preparing Spark release v1.5.0-rc3

2015-08-31 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 1c752b8b5 -> 2b270a166


Preparing Spark release v1.5.0-rc3


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/908e37bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/908e37bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/908e37bc

Branch: refs/heads/branch-1.5
Commit: 908e37bcc10132bb2aa7f80ae694a9df6e40f31a
Parents: 1c752b8
Author: Patrick Wendell 
Authored: Mon Aug 31 15:57:42 2015 -0700
Committer: Patrick Wendell 
Committed: Mon Aug 31 15:57:42 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt-assembly/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 33 files changed, 33 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 7b41ebb..3ef7d6f 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 16bf17c..684e07b 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index beb547f..6b082ad 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 3926b79..9ef1eda 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 5eda12d..aa7021d 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 33f2cd7..7d72f78 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 670c783..38683e3 100644
--- a/external/flume/pom.xml
+++ 

Git Push Summary

2015-08-31 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.5.0-rc3 [created] 908e37bcc

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/2] spark git commit: Preparing development version 1.5.1-SNAPSHOT

2015-08-31 Thread pwendell
Preparing development version 1.5.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b270a16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b270a16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b270a16

Branch: refs/heads/branch-1.5
Commit: 2b270a166d6bd5b42399400924c576c9996bfc10
Parents: 908e37b
Author: Patrick Wendell 
Authored: Mon Aug 31 15:57:49 2015 -0700
Committer: Patrick Wendell 
Committed: Mon Aug 31 15:57:49 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt-assembly/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 33 files changed, 33 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 3ef7d6f..7b41ebb 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 684e07b..16bf17c 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 6b082ad..beb547f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 9ef1eda..3926b79 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index aa7021d..5eda12d 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 7d72f78..33f2cd7 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 38683e3..670c783 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 s

spark git commit: [SPARK-10349] [ML] OneVsRest use 'when ... otherwise' not UDF to generate new label at binary reduction

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 540bdee93 -> fe16fd0b8


[SPARK-10349] [ML] OneVsRest use 'when ... otherwise' not UDF to generate new 
label at binary reduction

Currently OneVsRest use UDF to generate new binary label during training.
Considering that [SPARK-7321](https://issues.apache.org/jira/browse/SPARK-7321) 
has been merged, we can use ```when ... otherwise``` which will be more 
efficiency.

Author: Yanbo Liang 

Closes #8519 from yanboliang/spark-10349.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe16fd0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe16fd0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe16fd0b

Branch: refs/heads/master
Commit: fe16fd0b8b717f01151bc659ec3299dab091c97a
Parents: 540bdee
Author: Yanbo Liang 
Authored: Mon Aug 31 16:06:38 2015 -0700
Committer: Xiangrui Meng 
Committed: Mon Aug 31 16:06:38 2015 -0700

--
 .../org/apache/spark/ml/classification/OneVsRest.scala| 10 ++
 1 file changed, 2 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe16fd0b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
index c62e132..debc164 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
@@ -91,7 +91,6 @@ final class OneVsRestModel private[ml] (
 // add an accumulator column to store predictions of all the models
 val accColName = "mbc$acc" + UUID.randomUUID().toString
 val initUDF = udf { () => Map[Int, Double]() }
-val mapType = MapType(IntegerType, DoubleType, valueContainsNull = false)
 val newDataset = dataset.withColumn(accColName, initUDF())
 
 // persist if underlying dataset is not persistent.
@@ -195,16 +194,11 @@ final class OneVsRest(override val uid: String)
 
 // create k columns, one for each binary classifier.
 val models = Range(0, numClasses).par.map { index =>
-  val labelUDF = udf { (label: Double) =>
-if (label.toInt == index) 1.0 else 0.0
-  }
-
   // generate new label metadata for the binary problem.
-  // TODO: use when ... otherwise after SPARK-7321 is merged
   val newLabelMeta = 
BinaryAttribute.defaultAttr.withName("label").toMetadata()
   val labelColName = "mc2b$" + index
-  val trainingDataset =
-multiclassLabeled.withColumn(labelColName, labelUDF(col($(labelCol))), 
newLabelMeta)
+  val trainingDataset = multiclassLabeled.withColumn(
+labelColName, when(col($(labelCol)) === index.toDouble, 
1.0).otherwise(0.0), newLabelMeta)
   val classifier = getClassifier
   val paramMap = new ParamMap()
   paramMap.put(classifier.labelCol -> labelColName)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10355] [ML] [PySpark] Add Python API for SQLTransformer

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master fe16fd0b8 -> 52ea399e6


[SPARK-10355] [ML] [PySpark] Add Python API for SQLTransformer

Add Python API for SQLTransformer

Author: Yanbo Liang 

Closes #8527 from yanboliang/spark-10355.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52ea399e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52ea399e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52ea399e

Branch: refs/heads/master
Commit: 52ea399e6ee37b7c44aae7709863e006fca88906
Parents: fe16fd0
Author: Yanbo Liang 
Authored: Mon Aug 31 16:11:27 2015 -0700
Committer: Xiangrui Meng 
Committed: Mon Aug 31 16:11:27 2015 -0700

--
 python/pyspark/ml/feature.py | 57 ---
 1 file changed, 54 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/52ea399e/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 59300a6..0626281 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -28,9 +28,9 @@ from pyspark.mllib.linalg import _convert_to_vector
 
 __all__ = ['Binarizer', 'Bucketizer', 'DCT', 'ElementwiseProduct', 
'HashingTF', 'IDF', 'IDFModel',
'NGram', 'Normalizer', 'OneHotEncoder', 'PolynomialExpansion', 
'RegexTokenizer',
-   'StandardScaler', 'StandardScalerModel', 'StringIndexer', 
'StringIndexerModel',
-   'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', 
'Word2VecModel',
-   'PCA', 'PCAModel', 'RFormula', 'RFormulaModel']
+   'SQLTransformer', 'StandardScaler', 'StandardScalerModel', 
'StringIndexer',
+   'StringIndexerModel', 'Tokenizer', 'VectorAssembler', 
'VectorIndexer', 'Word2Vec',
+   'Word2VecModel', 'PCA', 'PCAModel', 'RFormula', 'RFormulaModel']
 
 
 @inherit_doc
@@ -744,6 +744,57 @@ class RegexTokenizer(JavaTransformer, HasInputCol, 
HasOutputCol):
 
 
 @inherit_doc
+class SQLTransformer(JavaTransformer):
+"""
+Implements the transforms which are defined by SQL statement.
+Currently we only support SQL syntax like 'SELECT ... FROM __THIS__'
+where '__THIS__' represents the underlying table of the input dataset.
+
+>>> df = sqlContext.createDataFrame([(0, 1.0, 3.0), (2, 2.0, 5.0)], ["id", 
"v1", "v2"])
+>>> sqlTrans = SQLTransformer(
+... statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM 
__THIS__")
+>>> sqlTrans.transform(df).head()
+Row(id=0, v1=1.0, v2=3.0, v3=4.0, v4=3.0)
+"""
+
+# a placeholder to make it appear in the generated doc
+statement = Param(Params._dummy(), "statement", "SQL statement")
+
+@keyword_only
+def __init__(self, statement=None):
+"""
+__init__(self, statement=None)
+"""
+super(SQLTransformer, self).__init__()
+self._java_obj = 
self._new_java_obj("org.apache.spark.ml.feature.SQLTransformer", self.uid)
+self.statement = Param(self, "statement", "SQL statement")
+kwargs = self.__init__._input_kwargs
+self.setParams(**kwargs)
+
+@keyword_only
+def setParams(self, statement=None):
+"""
+setParams(self, statement=None)
+Sets params for this SQLTransformer.
+"""
+kwargs = self.setParams._input_kwargs
+return self._set(**kwargs)
+
+def setStatement(self, value):
+"""
+Sets the value of :py:attr:`statement`.
+"""
+self._paramMap[self.statement] = value
+return self
+
+def getStatement(self):
+"""
+Gets the value of statement or its default value.
+"""
+return self.getOrDefault(self.statement)
+
+
+@inherit_doc
 class StandardScaler(JavaEstimator, HasInputCol, HasOutputCol):
 """
 Standardizes features by removing the mean and scaling to unit variance 
using column summary


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10378][SQL][Test] Remove HashJoinCompatibilitySuite.

2015-08-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 52ea399e6 -> d65656c45


[SPARK-10378][SQL][Test] Remove HashJoinCompatibilitySuite.

They don't bring much value since we now have better unit test coverage for 
hash joins. This will also help reduce the test time.

Author: Reynold Xin 

Closes #8542 from rxin/SPARK-10378.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d65656c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d65656c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d65656c4

Branch: refs/heads/master
Commit: d65656c455d19b83c6412571873586b458aa355e
Parents: 52ea399
Author: Reynold Xin 
Authored: Mon Aug 31 18:09:24 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 31 18:09:24 2015 -0700

--
 .../execution/HashJoinCompatibilitySuite.scala  | 169 ---
 1 file changed, 169 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d65656c4/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
--
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
deleted file mode 100644
index 1a5ba20..000
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import java.io.File
-
-import org.apache.spark.sql.SQLConf
-import org.apache.spark.sql.hive.test.TestHive
-
-/**
- * Runs the test cases that are included in the hive distribution with hash 
joins.
- */
-class HashJoinCompatibilitySuite extends HiveCompatibilitySuite {
-  override def beforeAll() {
-super.beforeAll()
-TestHive.setConf(SQLConf.SORTMERGE_JOIN, false)
-  }
-
-  override def afterAll() {
-TestHive.setConf(SQLConf.SORTMERGE_JOIN, true)
-super.afterAll()
-  }
-
-  override def whiteList = Seq(
-"auto_join0",
-"auto_join1",
-"auto_join10",
-"auto_join11",
-"auto_join12",
-"auto_join13",
-"auto_join14",
-"auto_join14_hadoop20",
-"auto_join15",
-"auto_join17",
-"auto_join18",
-"auto_join19",
-"auto_join2",
-"auto_join20",
-"auto_join21",
-"auto_join22",
-"auto_join23",
-"auto_join24",
-"auto_join25",
-"auto_join26",
-"auto_join27",
-"auto_join28",
-"auto_join3",
-"auto_join30",
-"auto_join31",
-"auto_join32",
-"auto_join4",
-"auto_join5",
-"auto_join6",
-"auto_join7",
-"auto_join8",
-"auto_join9",
-"auto_join_filters",
-"auto_join_nulls",
-"auto_join_reordering_values",
-"auto_smb_mapjoin_14",
-"auto_sortmerge_join_1",
-"auto_sortmerge_join_10",
-"auto_sortmerge_join_11",
-"auto_sortmerge_join_12",
-"auto_sortmerge_join_13",
-"auto_sortmerge_join_14",
-"auto_sortmerge_join_15",
-"auto_sortmerge_join_16",
-"auto_sortmerge_join_2",
-"auto_sortmerge_join_3",
-"auto_sortmerge_join_4",
-"auto_sortmerge_join_5",
-"auto_sortmerge_join_6",
-"auto_sortmerge_join_7",
-"auto_sortmerge_join_8",
-"auto_sortmerge_join_9",
-"correlationoptimizer1",
-"correlationoptimizer10",
-"correlationoptimizer11",
-"correlationoptimizer13",
-"correlationoptimizer14",
-"correlationoptimizer15",
-"correlationoptimizer2",
-"correlationoptimizer3",
-"correlationoptimizer4",
-"correlationoptimizer6",
-"correlationoptimizer7",
-"correlationoptimizer8",
-"correlationoptimizer9",
-"join0",
-"join1",
-"join10",
-"join11",
-"join12",
-"join13",
-"join14",
-"join14_hadoop20",
-"join15",
-"join16",
-"join17",
-"join18",
-"join19",
-"join2",
-"join20",
-"joi