[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193929635
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -236,6 +236,76 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class MapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193929951
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -236,6 +236,76 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class MapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  var i = 0
+  while (i < keyArrayData.numElements) {
+if (keyArrayData.isNullAt(i)) {
+  throw new RuntimeException("Cannot use null as map key!")
+}
+i += 1
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val i = ctx.freshName("i")
+s"""
+   |for (int $i = 0; $i < $keyArrayData.numElements(); $i++) {
+   |  if ($keyArrayData.isNullAt($i)) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
+ """.stripMargin
+  }
+  s"""
+ |if ($keyArrayData.numElements() != 
$valueArrayData.numElements()) {
+ |  throw new RuntimeException("The given two arrays should have 
the same length");
+ |}
+ |$keyArrayElemNullCheck
+ |${ev.value} = new $arrayBasedMapData($keyArrayData.copy(), 
$valueArrayData.copy());
+   """.stripMargin
+})
+  }
+
+  override def prettyName: String = "create_map_from_arrays"
--- End diff --

`map_from_arrays`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21258
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21258
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3841/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193928034
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
 ---
@@ -308,6 +313,292 @@ class CollectionExpressionsSuite extends 
SparkFunSuite with ExpressionEvalHelper
   ArrayMax(Literal.create(Seq(1.123, 0.1234, 1.121), 
ArrayType(DoubleType))), 1.123)
   }
 
+  test("Sequence") {
--- End diff --

Maybe we need additional words in the test title?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193928619
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
 ---
@@ -308,6 +313,292 @@ class CollectionExpressionsSuite extends 
SparkFunSuite with ExpressionEvalHelper
   ArrayMax(Literal.create(Seq(1.123, 0.1234, 1.121), 
ArrayType(DoubleType))), 1.123)
   }
 
+  test("Sequence") {
+// test null handling
+
+checkEvaluation(new Sequence(Literal(null, LongType), Literal(1L)), 
null)
+checkEvaluation(new Sequence(Literal(1L), Literal(null, LongType)), 
null)
+checkEvaluation(new Sequence(Literal(null, LongType), Literal(1L), 
Literal(1L)), null)
+checkEvaluation(new Sequence(Literal(1L), Literal(null, LongType), 
Literal(1L)), null)
+checkEvaluation(new Sequence(Literal(1L), Literal(1L), Literal(null, 
LongType)), null)
+
+// test sequence boundaries checking
+
+checkExceptionInExpression[IllegalArgumentException](
+  new Sequence(Literal(Int.MinValue), Literal(Int.MaxValue), 
Literal(1)),
+  EmptyRow, s"Too long sequence: 4294967296. Should be <= 
${Int.MaxValue}")
+
+checkExceptionInExpression[IllegalArgumentException](
+  new Sequence(Literal(1), Literal(2), Literal(0)), EmptyRow, 
"boundaries: 1 to 2 by 0")
+checkExceptionInExpression[IllegalArgumentException](
+  new Sequence(Literal(2), Literal(1), Literal(0)), EmptyRow, 
"boundaries: 2 to 1 by 0")
+checkExceptionInExpression[IllegalArgumentException](
+  new Sequence(Literal(2), Literal(1), Literal(1)), EmptyRow, 
"boundaries: 2 to 1 by 1")
+checkExceptionInExpression[IllegalArgumentException](
+  new Sequence(Literal(1), Literal(2), Literal(-1)), EmptyRow, 
"boundaries: 1 to 2 by -1")
+
+checkExceptionInExpression[IllegalArgumentException](
+  new Sequence(
+Literal(Date.valueOf("1970-01-02")),
+Literal(Date.valueOf("1970-01-01")),
+Literal(CalendarInterval.fromString("interval 1 day"))),
+  EmptyRow, "sequence boundaries: 1 to 0 by 1")
+
+checkExceptionInExpression[IllegalArgumentException](
+  new Sequence(
+Literal(Date.valueOf("1970-01-01")),
+Literal(Date.valueOf("1970-02-01")),
+Literal(CalendarInterval.fromString("interval 1 month").negate())),
+  EmptyRow,
+  s"sequence boundaries: 0 to 26784 by -${28 * 
CalendarInterval.MICROS_PER_DAY}")
+
+// test sequence with one element (zero step or equal start and stop)
+
+checkEvaluation(new Sequence(Literal(1), Literal(1), Literal(-1)), 
Seq(1))
+checkEvaluation(new Sequence(Literal(1), Literal(1), Literal(0)), 
Seq(1))
+checkEvaluation(new Sequence(Literal(1), Literal(1), Literal(1)), 
Seq(1))
+checkEvaluation(new Sequence(Literal(1), Literal(2), Literal(2)), 
Seq(1))
+checkEvaluation(new Sequence(Literal(1), Literal(0), Literal(-2)), 
Seq(1))
+
+// test sequence of different integral types (ascending and descending)
+
+checkEvaluation(new Sequence(Literal(1L), Literal(3L), Literal(1L)), 
Seq(1L, 2L, 3L))
+checkEvaluation(new Sequence(Literal(-3), Literal(3), Literal(3)), 
Seq(-3, 0, 3))
+checkEvaluation(
+  new Sequence(Literal(3.toShort), Literal(-3.toShort), 
Literal(-3.toShort)),
+  Seq(3.toShort, 0.toShort, -3.toShort))
+checkEvaluation(
+  new Sequence(Literal(-1.toByte), Literal(-3.toByte), 
Literal(-1.toByte)),
+  Seq(-1.toByte, -2.toByte, -3.toByte))
+  }
+
+  test("Sequence of timestamps") {
+checkEvaluation(new Sequence(
+  Literal(Timestamp.valueOf("2018-01-01 00:00:00")),
+  Literal(Timestamp.valueOf("2018-01-02 00:00:00")),
+  Literal(CalendarInterval.fromString("interval 12 hours"))),
+  Seq(
+Timestamp.valueOf("2018-01-01 00:00:00"),
+Timestamp.valueOf("2018-01-01 12:00:00"),
+Timestamp.valueOf("2018-01-02 00:00:00")))
+
+checkEvaluation(new Sequence(
+  Literal(Timestamp.valueOf("2018-01-01 00:00:00")),
+  Literal(Timestamp.valueOf("2018-01-02 00:00:01")),
+  Literal(CalendarInterval.fromString("interval 12 hours"))),
+  Seq(
+Timestamp.valueOf("2018-01-01 00:00:00"),
+Timestamp.valueOf("2018-01-01 12:00:00"),
+Timestamp.valueOf("2018-01-02 00:00:00")))
+
+checkEvaluation(new Sequence(
+  Literal(Timestamp.valueOf("2018-01-02 00:00:00")),
+  Literal(Timestamp.valueOf("2018-01-01 00:00:00")),
+  Literal(CalendarInterval.fromString("interval 12 hours").negate())),
+  Seq(
+Timestamp.valueOf("2018-01-02 00:00:00"),
+  

[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193926575
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1887,6 +1889,402 @@ case class Flatten(child: Expression) extends 
UnaryExpression {
   override def prettyName: String = "flatten"
 }
 
+@ExpressionDescription(
+  usage = """
+_FUNC_(start, stop, step) - Generates an array of elements from start 
to stop (inclusive),
+  incrementing by step. The type of the returned elements is the same 
as the type of argument
+  expressions.
+
+  Supported types are: byte, short, integer, long, date, timestamp.
+
+  The start and stop expressions must resolve to the same type.
+  If start and stop expressions resolve to the 'date' or 'timestamp' 
type
+  then the step expression must resolve to the 'interval' type, 
otherwise to the same type
+  as the start and stop expressions.
+  """,
+  arguments = """
+Arguments:
+  * start - an expression. The start of the range.
+  * stop - an expression. The end the range (inclusive).
+  * step - an optional expression. The step of the range.
+  By default step is 1 if start is less than or equal to stop, 
otherwise -1.
+  For the temporal sequences it's 1 day and -1 day respectively.
+  If start is greater than stop then the step must be negative, 
and vice versa.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(1, 5);
+   [1, 2, 3, 4, 5]
+  > SELECT _FUNC_(5, 1);
+   [5, 4, 3, 2, 1]
+  > SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), 
interval 1 month);
+   [2018-01-01, 2018-02-01, 2018-03-01]
+  """,
+  since = "2.4.0"
+)
+case class Sequence(
+start: Expression,
+stop: Expression,
+stepOpt: Option[Expression],
+timeZoneId: Option[String] = None)
+  extends Expression
+  with TimeZoneAwareExpression {
+
+  import Sequence._
+
+  def this(start: Expression, stop: Expression) =
+this(start, stop, None, None)
+
+  def this(start: Expression, stop: Expression, step: Expression) =
+this(start, stop, Some(step), None)
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+copy(timeZoneId = Some(timeZoneId))
+
+  override def children: Seq[Expression] = Seq(start, stop) ++ stepOpt
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override lazy val dataType: ArrayType = ArrayType(start.dataType, 
containsNull = false)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val startType = start.dataType
+def stepType = stepOpt.get.dataType
+val typesCorrect =
+  startType.sameType(stop.dataType) &&
+(startType match {
+  case TimestampType | DateType =>
+stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepType)
+  case _: IntegralType =>
+stepOpt.isEmpty || stepType.sameType(startType)
+  case _ => false
+})
+
+if (typesCorrect) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName only supports integral, timestamp or date types")
+}
+  }
+
+  def coercibleChildren: Seq[Expression] = children.filter(_.dataType != 
CalendarIntervalType)
+
+  def castChildrenTo(widerType: DataType): Expression = Sequence(
+Cast(start, widerType),
+Cast(stop, widerType),
+stepOpt.map(step => if (step.dataType != CalendarIntervalType) 
Cast(step, widerType) else step),
+timeZoneId)
+
+  private lazy val impl: SequenceImpl = dataType.elementType match {
+case iType: IntegralType =>
+  type T = iType.InternalType
+  val ct = ClassTag[T](iType.tag.mirror.runtimeClass(iType.tag.tpe))
+  new IntegralSequenceImpl(iType)(ct, iType.integral)
+
+case TimestampType =>
+  new TemporalSequenceImpl[Long](LongType, 1, identity, timeZone)
+
+case DateType =>
+  new TemporalSequenceImpl[Int](IntegerType, MICROS_PER_DAY, _.toInt, 
timeZone)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val startVal = start.eval(input)
+if (startVal == null) return null
+val stopVal = stop.eval(input)
+if (stopVal == null) return null
+val stepVal = 
stepOpt.map(_.eval(input)).getOrElse(impl.defaultStep(startVal, stopVal))
+if (stepVal == null) return null
+
+

[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193927174
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1887,6 +1889,402 @@ case class Flatten(child: Expression) extends 
UnaryExpression {
   override def prettyName: String = "flatten"
 }
 
+@ExpressionDescription(
+  usage = """
+_FUNC_(start, stop, step) - Generates an array of elements from start 
to stop (inclusive),
+  incrementing by step. The type of the returned elements is the same 
as the type of argument
+  expressions.
+
+  Supported types are: byte, short, integer, long, date, timestamp.
+
+  The start and stop expressions must resolve to the same type.
+  If start and stop expressions resolve to the 'date' or 'timestamp' 
type
+  then the step expression must resolve to the 'interval' type, 
otherwise to the same type
+  as the start and stop expressions.
+  """,
+  arguments = """
+Arguments:
+  * start - an expression. The start of the range.
+  * stop - an expression. The end the range (inclusive).
+  * step - an optional expression. The step of the range.
+  By default step is 1 if start is less than or equal to stop, 
otherwise -1.
+  For the temporal sequences it's 1 day and -1 day respectively.
+  If start is greater than stop then the step must be negative, 
and vice versa.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(1, 5);
+   [1, 2, 3, 4, 5]
+  > SELECT _FUNC_(5, 1);
+   [5, 4, 3, 2, 1]
+  > SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), 
interval 1 month);
+   [2018-01-01, 2018-02-01, 2018-03-01]
+  """,
+  since = "2.4.0"
+)
+case class Sequence(
+start: Expression,
+stop: Expression,
+stepOpt: Option[Expression],
+timeZoneId: Option[String] = None)
+  extends Expression
+  with TimeZoneAwareExpression {
+
+  import Sequence._
+
+  def this(start: Expression, stop: Expression) =
+this(start, stop, None, None)
+
+  def this(start: Expression, stop: Expression, step: Expression) =
+this(start, stop, Some(step), None)
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+copy(timeZoneId = Some(timeZoneId))
+
+  override def children: Seq[Expression] = Seq(start, stop) ++ stepOpt
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override lazy val dataType: ArrayType = ArrayType(start.dataType, 
containsNull = false)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val startType = start.dataType
+def stepType = stepOpt.get.dataType
+val typesCorrect =
+  startType.sameType(stop.dataType) &&
+(startType match {
+  case TimestampType | DateType =>
+stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepType)
+  case _: IntegralType =>
+stepOpt.isEmpty || stepType.sameType(startType)
+  case _ => false
+})
+
+if (typesCorrect) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName only supports integral, timestamp or date types")
+}
+  }
+
+  def coercibleChildren: Seq[Expression] = children.filter(_.dataType != 
CalendarIntervalType)
+
+  def castChildrenTo(widerType: DataType): Expression = Sequence(
+Cast(start, widerType),
+Cast(stop, widerType),
+stepOpt.map(step => if (step.dataType != CalendarIntervalType) 
Cast(step, widerType) else step),
+timeZoneId)
+
+  private lazy val impl: SequenceImpl = dataType.elementType match {
+case iType: IntegralType =>
+  type T = iType.InternalType
+  val ct = ClassTag[T](iType.tag.mirror.runtimeClass(iType.tag.tpe))
+  new IntegralSequenceImpl(iType)(ct, iType.integral)
+
+case TimestampType =>
+  new TemporalSequenceImpl[Long](LongType, 1, identity, timeZone)
+
+case DateType =>
+  new TemporalSequenceImpl[Int](IntegerType, MICROS_PER_DAY, _.toInt, 
timeZone)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val startVal = start.eval(input)
+if (startVal == null) return null
+val stopVal = stop.eval(input)
+if (stopVal == null) return null
+val stepVal = 
stepOpt.map(_.eval(input)).getOrElse(impl.defaultStep(startVal, stopVal))
+if (stepVal == null) return null
+
+

[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193926459
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1887,6 +1889,402 @@ case class Flatten(child: Expression) extends 
UnaryExpression {
   override def prettyName: String = "flatten"
 }
 
+@ExpressionDescription(
+  usage = """
+_FUNC_(start, stop, step) - Generates an array of elements from start 
to stop (inclusive),
+  incrementing by step. The type of the returned elements is the same 
as the type of argument
+  expressions.
+
+  Supported types are: byte, short, integer, long, date, timestamp.
+
+  The start and stop expressions must resolve to the same type.
+  If start and stop expressions resolve to the 'date' or 'timestamp' 
type
+  then the step expression must resolve to the 'interval' type, 
otherwise to the same type
+  as the start and stop expressions.
+  """,
+  arguments = """
+Arguments:
+  * start - an expression. The start of the range.
+  * stop - an expression. The end the range (inclusive).
+  * step - an optional expression. The step of the range.
+  By default step is 1 if start is less than or equal to stop, 
otherwise -1.
+  For the temporal sequences it's 1 day and -1 day respectively.
+  If start is greater than stop then the step must be negative, 
and vice versa.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(1, 5);
+   [1, 2, 3, 4, 5]
+  > SELECT _FUNC_(5, 1);
+   [5, 4, 3, 2, 1]
+  > SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), 
interval 1 month);
+   [2018-01-01, 2018-02-01, 2018-03-01]
+  """,
+  since = "2.4.0"
+)
+case class Sequence(
+start: Expression,
+stop: Expression,
+stepOpt: Option[Expression],
+timeZoneId: Option[String] = None)
+  extends Expression
+  with TimeZoneAwareExpression {
+
+  import Sequence._
+
+  def this(start: Expression, stop: Expression) =
+this(start, stop, None, None)
+
+  def this(start: Expression, stop: Expression, step: Expression) =
+this(start, stop, Some(step), None)
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+copy(timeZoneId = Some(timeZoneId))
+
+  override def children: Seq[Expression] = Seq(start, stop) ++ stepOpt
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override lazy val dataType: ArrayType = ArrayType(start.dataType, 
containsNull = false)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val startType = start.dataType
+def stepType = stepOpt.get.dataType
+val typesCorrect =
+  startType.sameType(stop.dataType) &&
+(startType match {
+  case TimestampType | DateType =>
+stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepType)
+  case _: IntegralType =>
+stepOpt.isEmpty || stepType.sameType(startType)
+  case _ => false
+})
+
+if (typesCorrect) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName only supports integral, timestamp or date types")
+}
+  }
+
+  def coercibleChildren: Seq[Expression] = children.filter(_.dataType != 
CalendarIntervalType)
+
+  def castChildrenTo(widerType: DataType): Expression = Sequence(
+Cast(start, widerType),
+Cast(stop, widerType),
+stepOpt.map(step => if (step.dataType != CalendarIntervalType) 
Cast(step, widerType) else step),
+timeZoneId)
+
+  private lazy val impl: SequenceImpl = dataType.elementType match {
+case iType: IntegralType =>
+  type T = iType.InternalType
+  val ct = ClassTag[T](iType.tag.mirror.runtimeClass(iType.tag.tpe))
+  new IntegralSequenceImpl(iType)(ct, iType.integral)
+
+case TimestampType =>
+  new TemporalSequenceImpl[Long](LongType, 1, identity, timeZone)
+
+case DateType =>
+  new TemporalSequenceImpl[Int](IntegerType, MICROS_PER_DAY, _.toInt, 
timeZone)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val startVal = start.eval(input)
+if (startVal == null) return null
+val stopVal = stop.eval(input)
+if (stopVal == null) return null
+val stepVal = 
stepOpt.map(_.eval(input)).getOrElse(impl.defaultStep(startVal, stopVal))
+if (stepVal == null) return null
+
+

[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193926035
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1887,6 +1889,402 @@ case class Flatten(child: Expression) extends 
UnaryExpression {
   override def prettyName: String = "flatten"
 }
 
+@ExpressionDescription(
+  usage = """
+_FUNC_(start, stop, step) - Generates an array of elements from start 
to stop (inclusive),
+  incrementing by step. The type of the returned elements is the same 
as the type of argument
+  expressions.
+
+  Supported types are: byte, short, integer, long, date, timestamp.
+
+  The start and stop expressions must resolve to the same type.
+  If start and stop expressions resolve to the 'date' or 'timestamp' 
type
+  then the step expression must resolve to the 'interval' type, 
otherwise to the same type
+  as the start and stop expressions.
+  """,
+  arguments = """
+Arguments:
+  * start - an expression. The start of the range.
+  * stop - an expression. The end the range (inclusive).
+  * step - an optional expression. The step of the range.
+  By default step is 1 if start is less than or equal to stop, 
otherwise -1.
+  For the temporal sequences it's 1 day and -1 day respectively.
+  If start is greater than stop then the step must be negative, 
and vice versa.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(1, 5);
+   [1, 2, 3, 4, 5]
+  > SELECT _FUNC_(5, 1);
+   [5, 4, 3, 2, 1]
+  > SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), 
interval 1 month);
+   [2018-01-01, 2018-02-01, 2018-03-01]
+  """,
+  since = "2.4.0"
+)
+case class Sequence(
+start: Expression,
+stop: Expression,
+stepOpt: Option[Expression],
+timeZoneId: Option[String] = None)
+  extends Expression
+  with TimeZoneAwareExpression {
+
+  import Sequence._
+
+  def this(start: Expression, stop: Expression) =
+this(start, stop, None, None)
+
+  def this(start: Expression, stop: Expression, step: Expression) =
+this(start, stop, Some(step), None)
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+copy(timeZoneId = Some(timeZoneId))
+
+  override def children: Seq[Expression] = Seq(start, stop) ++ stepOpt
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override lazy val dataType: ArrayType = ArrayType(start.dataType, 
containsNull = false)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val startType = start.dataType
+def stepType = stepOpt.get.dataType
+val typesCorrect =
+  startType.sameType(stop.dataType) &&
+(startType match {
+  case TimestampType | DateType =>
+stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepType)
+  case _: IntegralType =>
+stepOpt.isEmpty || stepType.sameType(startType)
+  case _ => false
+})
+
+if (typesCorrect) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName only supports integral, timestamp or date types")
+}
+  }
+
+  def coercibleChildren: Seq[Expression] = children.filter(_.dataType != 
CalendarIntervalType)
+
+  def castChildrenTo(widerType: DataType): Expression = Sequence(
+Cast(start, widerType),
+Cast(stop, widerType),
+stepOpt.map(step => if (step.dataType != CalendarIntervalType) 
Cast(step, widerType) else step),
+timeZoneId)
+
+  private lazy val impl: SequenceImpl = dataType.elementType match {
+case iType: IntegralType =>
+  type T = iType.InternalType
+  val ct = ClassTag[T](iType.tag.mirror.runtimeClass(iType.tag.tpe))
+  new IntegralSequenceImpl(iType)(ct, iType.integral)
+
+case TimestampType =>
+  new TemporalSequenceImpl[Long](LongType, 1, identity, timeZone)
+
+case DateType =>
+  new TemporalSequenceImpl[Int](IntegerType, MICROS_PER_DAY, _.toInt, 
timeZone)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val startVal = start.eval(input)
+if (startVal == null) return null
+val stopVal = stop.eval(input)
+if (stopVal == null) return null
+val stepVal = 
stepOpt.map(_.eval(input)).getOrElse(impl.defaultStep(startVal, stopVal))
+if (stepVal == null) return null
+
+

[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21495
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193927263
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1887,6 +1889,402 @@ case class Flatten(child: Expression) extends 
UnaryExpression {
   override def prettyName: String = "flatten"
 }
 
+@ExpressionDescription(
+  usage = """
+_FUNC_(start, stop, step) - Generates an array of elements from start 
to stop (inclusive),
+  incrementing by step. The type of the returned elements is the same 
as the type of argument
+  expressions.
+
+  Supported types are: byte, short, integer, long, date, timestamp.
+
+  The start and stop expressions must resolve to the same type.
+  If start and stop expressions resolve to the 'date' or 'timestamp' 
type
+  then the step expression must resolve to the 'interval' type, 
otherwise to the same type
+  as the start and stop expressions.
+  """,
+  arguments = """
+Arguments:
+  * start - an expression. The start of the range.
+  * stop - an expression. The end the range (inclusive).
+  * step - an optional expression. The step of the range.
+  By default step is 1 if start is less than or equal to stop, 
otherwise -1.
+  For the temporal sequences it's 1 day and -1 day respectively.
+  If start is greater than stop then the step must be negative, 
and vice versa.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(1, 5);
+   [1, 2, 3, 4, 5]
+  > SELECT _FUNC_(5, 1);
+   [5, 4, 3, 2, 1]
+  > SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), 
interval 1 month);
+   [2018-01-01, 2018-02-01, 2018-03-01]
+  """,
+  since = "2.4.0"
+)
+case class Sequence(
+start: Expression,
+stop: Expression,
+stepOpt: Option[Expression],
+timeZoneId: Option[String] = None)
+  extends Expression
+  with TimeZoneAwareExpression {
+
+  import Sequence._
+
+  def this(start: Expression, stop: Expression) =
+this(start, stop, None, None)
+
+  def this(start: Expression, stop: Expression, step: Expression) =
+this(start, stop, Some(step), None)
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+copy(timeZoneId = Some(timeZoneId))
+
+  override def children: Seq[Expression] = Seq(start, stop) ++ stepOpt
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override lazy val dataType: ArrayType = ArrayType(start.dataType, 
containsNull = false)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val startType = start.dataType
+def stepType = stepOpt.get.dataType
+val typesCorrect =
+  startType.sameType(stop.dataType) &&
+(startType match {
+  case TimestampType | DateType =>
+stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepType)
+  case _: IntegralType =>
+stepOpt.isEmpty || stepType.sameType(startType)
+  case _ => false
+})
+
+if (typesCorrect) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName only supports integral, timestamp or date types")
+}
+  }
+
+  def coercibleChildren: Seq[Expression] = children.filter(_.dataType != 
CalendarIntervalType)
+
+  def castChildrenTo(widerType: DataType): Expression = Sequence(
+Cast(start, widerType),
+Cast(stop, widerType),
+stepOpt.map(step => if (step.dataType != CalendarIntervalType) 
Cast(step, widerType) else step),
+timeZoneId)
+
+  private lazy val impl: SequenceImpl = dataType.elementType match {
+case iType: IntegralType =>
+  type T = iType.InternalType
+  val ct = ClassTag[T](iType.tag.mirror.runtimeClass(iType.tag.tpe))
+  new IntegralSequenceImpl(iType)(ct, iType.integral)
+
+case TimestampType =>
+  new TemporalSequenceImpl[Long](LongType, 1, identity, timeZone)
+
+case DateType =>
+  new TemporalSequenceImpl[Int](IntegerType, MICROS_PER_DAY, _.toInt, 
timeZone)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val startVal = start.eval(input)
+if (startVal == null) return null
+val stopVal = stop.eval(input)
+if (stopVal == null) return null
+val stepVal = 
stepOpt.map(_.eval(input)).getOrElse(impl.defaultStep(startVal, stopVal))
+if (stepVal == null) return null
+
+

[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193899743
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1887,6 +1889,402 @@ case class Flatten(child: Expression) extends 
UnaryExpression {
   override def prettyName: String = "flatten"
 }
 
+@ExpressionDescription(
+  usage = """
+_FUNC_(start, stop, step) - Generates an array of elements from start 
to stop (inclusive),
+  incrementing by step. The type of the returned elements is the same 
as the type of argument
+  expressions.
+
+  Supported types are: byte, short, integer, long, date, timestamp.
+
+  The start and stop expressions must resolve to the same type.
+  If start and stop expressions resolve to the 'date' or 'timestamp' 
type
+  then the step expression must resolve to the 'interval' type, 
otherwise to the same type
+  as the start and stop expressions.
+  """,
+  arguments = """
+Arguments:
+  * start - an expression. The start of the range.
+  * stop - an expression. The end the range (inclusive).
+  * step - an optional expression. The step of the range.
+  By default step is 1 if start is less than or equal to stop, 
otherwise -1.
+  For the temporal sequences it's 1 day and -1 day respectively.
+  If start is greater than stop then the step must be negative, 
and vice versa.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(1, 5);
+   [1, 2, 3, 4, 5]
+  > SELECT _FUNC_(5, 1);
+   [5, 4, 3, 2, 1]
+  > SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), 
interval 1 month);
+   [2018-01-01, 2018-02-01, 2018-03-01]
+  """,
+  since = "2.4.0"
+)
+case class Sequence(
+start: Expression,
+stop: Expression,
+stepOpt: Option[Expression],
+timeZoneId: Option[String] = None)
+  extends Expression
+  with TimeZoneAwareExpression {
+
+  import Sequence._
+
+  def this(start: Expression, stop: Expression) =
+this(start, stop, None, None)
+
+  def this(start: Expression, stop: Expression, step: Expression) =
+this(start, stop, Some(step), None)
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+copy(timeZoneId = Some(timeZoneId))
+
+  override def children: Seq[Expression] = Seq(start, stop) ++ stepOpt
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override lazy val dataType: ArrayType = ArrayType(start.dataType, 
containsNull = false)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val startType = start.dataType
+def stepType = stepOpt.get.dataType
+val typesCorrect =
+  startType.sameType(stop.dataType) &&
+(startType match {
+  case TimestampType | DateType =>
+stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepType)
+  case _: IntegralType =>
+stepOpt.isEmpty || stepType.sameType(startType)
+  case _ => false
+})
+
+if (typesCorrect) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName only supports integral, timestamp or date types")
+}
+  }
+
+  def coercibleChildren: Seq[Expression] = children.filter(_.dataType != 
CalendarIntervalType)
+
+  def castChildrenTo(widerType: DataType): Expression = Sequence(
--- End diff --

We can use `withNewChildren` instead?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21495
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3840/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21258
  
**[Test build #91542 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91542/testReport)**
 for PR 21258 at commit 
[`228fcc6`](https://github.com/apache/spark/commit/228fcc66e2b85b957833da739a20229867d51cbc).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193928013
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
--- End diff --

sure, done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193927995
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
--- End diff --

good catch, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21495
  
**[Test build #91541 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91541/testReport)**
 for PR 21495 at commit 
[`f91d75a`](https://github.com/apache/spark/commit/f91d75ae6b77bef6bf7eb8db98a345e6eb822393).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-07 Thread dbtsai
Github user dbtsai commented on the issue:

https://github.com/apache/spark/pull/21495
  
@som-snytt initialize it in  `printWelcome` will not work since in order 
version of Scala, `printWelcome` is the last one to be executed. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193927508
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) {
+throw new RuntimeException("Cannot use null as map key!")
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", 
leftArrayType.elementType)
+val array = ctx.freshName("array")
+val i = ctx.freshName("i")
+s"""
+   |Object[] $array = 
$keyArrayData.toObjectArray($leftArrayTypeTerm);
+   |for (int $i = 0; $i < $array.length; $i++) {
+   |  if ($array[$i] == null) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
--- End diff --

Got it. An array has been evaluated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21469
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91535/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21469
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21469
  
**[Test build #91535 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91535/testReport)**
 for PR 21469 at commit 
[`3c80cad`](https://github.com/apache/spark/commit/3c80cad32c056a24a7f5ffd7ab0ae3f7e096a62d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...

2018-06-07 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21495#discussion_r193927042
  
--- Diff: 
repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala
 ---
@@ -21,8 +21,22 @@ import scala.collection.mutable
 import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 
-class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends 
IMain(settings, out) {
-  self =>
+class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, 
initializeSpark: () => Unit)
+extends IMain(settings, out) { self =>
+
+  /**
+   * We override `initializeSynchronous` to initialize Spark *after* 
`intp` is properly initialized
+   * and *before* the REPL sees any files in the private `loadInitFiles` 
functions, so that
+   * the Spark context is visible in those files.
+   *
+   * This is a bit of a hack, but there isn't another hook available to us 
at this point.
+   *
+   * See the discussion in Scala community 
https://github.com/scala/bug/issues/10913 for detail.
+   */
+  override def initializeSynchronous(): Unit = {
+super.initializeSynchronous()
+initializeSpark()
--- End diff --

It is working for me. I got `scala-compiler-2.11.12.jar` in my classpath. 
Can you do a clean build?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21504
  
**[Test build #91540 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91540/testReport)**
 for PR 21504 at commit 
[`02b2973`](https://github.com/apache/spark/commit/02b29731161a3e6ad4841c09a3bf02004e0a87e1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193923588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +56,11 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { 
classNames =>
+Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+  sparkSession.sparkContext.conf).foreach(addListener)
+  }
+
--- End diff --

Good point. Addressed, please check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21477
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21477
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3839/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21477
  
**[Test build #91539 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91539/testReport)**
 for PR 21477 at commit 
[`ecf3d88`](https://github.com/apache/spark/commit/ecf3d88954afdab16c492fd479c1051aff8a3b95).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21402: SPARK-24355 Spark external shuffle server improvement to...

2018-06-07 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/21402
  
> StreamRequest will not block the server netty handler thread.

Hmm, I'm not so sure that's accurate. I think the main difference is that I 
don't think there is currently any code path that sends a `StreamRequest` to 
the shuffle service. But for example if many executors request files from the 
driver simultaneously, you could potentially end up in the same situation. It's 
a less serious issue since I think it's a lot less common for large files to be 
transferred that way, at least after startup.

I took a look at the code and it seems the actual change that avoids the 
disk thrashing is the synchronization done in the chunk fetch handler; it only 
allows a certain number of threads to actually do disk reads simultaneously. 
That's an improvement already, but a couple of questions popped in my head when 
I read your comment:

- how does that relate to maxChunksBeingTransferred()? Aren't both settings 
effectively a limit on the number of requests being serviced, making the 
existing one a little redundant?

- would there be benefits by trying to add some sort of disk affinity to 
these threads? e.g. send fetch requests hitting different disks to different 
queues.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21511: [SPARK-24491][Kubernetes] Configuration support for requ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21511
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...

2018-06-07 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21495#discussion_r193920472
  
--- Diff: 
repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala
 ---
@@ -21,8 +21,22 @@ import scala.collection.mutable
 import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 
-class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends 
IMain(settings, out) {
-  self =>
+class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, 
initializeSpark: () => Unit)
+extends IMain(settings, out) { self =>
+
+  /**
+   * We override `initializeSynchronous` to initialize Spark *after* 
`intp` is properly initialized
+   * and *before* the REPL sees any files in the private `loadInitFiles` 
functions, so that
+   * the Spark context is visible in those files.
+   *
+   * This is a bit of a hack, but there isn't another hook available to us 
at this point.
+   *
+   * See the discussion in Scala community 
https://github.com/scala/bug/issues/10913 for detail.
+   */
+  override def initializeSynchronous(): Unit = {
+super.initializeSynchronous()
+initializeSpark()
--- End diff --

It looks like `jline` version mismatch.
- Spark 2.4.0-SNAPSHOT uses 
[2.12.1](https://github.com/apache/spark/blob/master/pom.xml#L749)
- Scala 2.11.12 uses 
[2.14.3](https://github.com/scala/scala/blob/2.11.x/versions.properties#L35)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21511: [SPARK-24491][Kubernetes] Configuration support for requ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21511
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...

2018-06-07 Thread alexmilowski
GitHub user alexmilowski opened a pull request:

https://github.com/apache/spark/pull/21511

[SPARK-24491][Kubernetes] Configuration support for requesting GPUs on k8s

## What changes were proposed in this pull request?

Configuration support for generating the GPU requests in the limits section 
for the executor pods.

## How was this patch tested?

The patch has been tested on a local on-premise cluster with mixed nodes 
(some with GPUs and some without). There are currently no contributed tests for 
the patch. :(

Legal: I (Alex Miłowski) developed and tested this patch. It is my 
original work and that I license to the project under the project’s open 
source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alexmilowski/spark k8s-gpu

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21511.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21511


commit 583928ed2f280ca90c77fc12bf49817f6792db66
Author: alex.milowski 
Date:   2018-06-07T23:05:14Z

Configuration support for requesting GPUs on k8s




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21495
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21495
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3838/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...

2018-06-07 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21495#discussion_r193919141
  
--- Diff: 
repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala
 ---
@@ -21,8 +21,22 @@ import scala.collection.mutable
 import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 
-class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends 
IMain(settings, out) {
-  self =>
+class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, 
initializeSpark: () => Unit)
+extends IMain(settings, out) { self =>
+
+  /**
+   * We override `initializeSynchronous` to initialize Spark *after* 
`intp` is properly initialized
+   * and *before* the REPL sees any files in the private `loadInitFiles` 
functions, so that
+   * the Spark context is visible in those files.
+   *
+   * This is a bit of a hack, but there isn't another hook available to us 
at this point.
+   *
+   * See the discussion in Scala community 
https://github.com/scala/bug/issues/10913 for detail.
+   */
+  override def initializeSynchronous(): Unit = {
+super.initializeSynchronous()
+initializeSpark()
--- End diff --

For my environment, I'm hitting `NoSuchMethodError` like the following. Did 
you see something like this?

```scala
~/PR-21495:PR-21495$ bin/spark-shell
18/06/07 23:39:00 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
  /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.

scala> Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = 
local-1528414746558).
Spark session available as 'spark'.
Exception in thread "main" java.lang.NoSuchMethodError: 
jline.console.completer.CandidateListCompletionHandler.setPrintSpaceAfterFullCompletion(Z)V
at 
scala.tools.nsc.interpreter.jline.JLineConsoleReader.initCompletion(JLineReader.scala:139)
at 
scala.tools.nsc.interpreter.jline.InteractiveReader.postInit(JLineReader.scala:54)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21495
  
**[Test build #91538 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91538/testReport)**
 for PR 21495 at commit 
[`de790fd`](https://github.com/apache/spark/commit/de790fd251ba3727bba23ceb1ca07559d25b7e87).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-07 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/21495
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21482
  
I think consistency in Spark's naming convention (and therefore increased
discoverability by users) outweighs the advantage of naming it exactly for
the Impala equivalent. I do agree that multiple aliases probably aren't
worth the trouble at this point.

FWIW, I would have used this function if it had been available recently. So
it's not just hypothetical. And to me this provides some symmetry for
support for 'special' float values, since we already have isnan().

On 7 June 2018 at 15:36, Reynold Xin  wrote:

> Thanks, Henry. In general I'm not a huge fan of adding something because
> hypothetically somebody might want it. Also if you want this to be
> compatible with Impala, wouldn't you want to name this the same way as
> Impala?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...

2018-06-07 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/21092#discussion_r193913624
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -154,6 +176,24 @@ private[spark] object Config extends Logging {
   .checkValue(interval => interval > 0, s"Logging interval must be a 
positive time value.")
   .createWithDefaultString("1s")
 
+  val MEMORY_OVERHEAD_FACTOR =
+ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
+  .doc("This sets the Memory Overhead Factor that will allocate memory 
to non-JVM jobs " +
+"which in the case of JVM tasks will default to 0.10 and 0.40 for 
non-JVM jobs")
+  .doubleConf
+  .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1,
+"Ensure that memory overhead is a double between 0 --> 1.0")
+  .createWithDefault(0.1)
+
+  val PYSPARK_MAJOR_PYTHON_VERSION =
+ConfigBuilder("spark.kubernetes.pyspark.pythonversion")
+  .doc("This sets the python version. Either 2 or 3. (Python2 or 
Python3)")
+  .stringConf
+  .checkValue(pv => List("2", "3").contains(pv),
+"Ensure that Python Version is either Python2 or Python3")
+  .createWithDefault("2")
--- End diff --

I am willing to do that: thoughts @holdenk ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...

2018-06-07 Thread ifilonenko
Github user ifilonenko commented on the issue:

https://github.com/apache/spark/pull/21092
  
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- Run SparkPi with a test secret mounted into the driver and executor pods
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
Run completed in 4 minutes, 28 seconds.
Total number of tests run: 11
Suites: completed 2, aborted 0
Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO] 

[INFO] BUILD SUCCESS
[INFO] 

[INFO] Total time: 05:24 min
[INFO] Finished at: 2018-06-07T18:54:42-04:00
[INFO] Final Memory: 21M/509M
[INFO] 


For new addition to: 
https://github.com/apache-spark-on-k8s/spark-integration/pull/46 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21109
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91534/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21109
  
Build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21109
  
**[Test build #91534 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91534/testReport)**
 for PR 21109 at commit 
[`3fa690f`](https://github.com/apache/spark/commit/3fa690faf4e9a0b7d8eb2a5854ebb6a854a44d2a).
 * This patch **fails Spark unit tests**.
 * This patch **does not merge cleanly**.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread merlintang
Github user merlintang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193911087
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +56,11 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { 
classNames =>
+Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+  sparkSession.sparkContext.conf).foreach(addListener)
+  }
+
--- End diff --

two comments here: 
1. we need to log the registration here 
2. we need to use try catch for this, it is possible that register fail. 
this would break the job. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/21482
  
Thanks, Henry. In general I'm not a huge fan of adding something because 
hypothetically somebody might want it. Also if you want this to be compatible 
with Impala, wouldn't you want to name this the same way as Impala?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21092
  
Kubernetes integration test status success
URL: 
https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3694/



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21092
  
Kubernetes integration test starting
URL: 
https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3694/



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21092
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21092
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3837/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21510
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3836/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21510
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread NihalHarish
Github user NihalHarish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r193902999
  
--- Diff: R/pkg/NAMESPACE ---
@@ -281,6 +281,8 @@ exportMethods("%<=>%",
   "initcap",
   "input_file_name",
   "instr",
+  "isInf",
+  "isinf",
--- End diff --

Added tests in my latest commit


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21510
  
**[Test build #91536 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91536/testReport)**
 for PR 21510 at commit 
[`58a9ec4`](https://github.com/apache/spark/commit/58a9ec42402cc92675e3e057309a803d08fd0cd7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21092
  
**[Test build #91537 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91537/testReport)**
 for PR 21092 at commit 
[`ab92913`](https://github.com/apache/spark/commit/ab92913f1d0c303a5ff6b2937019d5e47c25611d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler i...

2018-06-07 Thread jaceklaskowski
GitHub user jaceklaskowski opened a pull request:

https://github.com/apache/spark/pull/21510

[SPARK-24490][WebUI] Use WebUI.addStaticHandler in web UIs

`WebUI` defines `addStaticHandler` that web UIs don't use (and simply 
introduce duplication). Let's clean them up and remove duplications.

Local build and waiting for Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jaceklaskowski/spark 
SPARK-24490-Use-WebUI.addStaticHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21510.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21510


commit 58a9ec42402cc92675e3e057309a803d08fd0cd7
Author: Jacek Laskowski 
Date:   2018-06-07T21:56:38Z

[SPARK-24490][WebUI] Use WebUI.addStaticHandler in web UIs

Closes https://issues.apache.org/jira/browse/SPARK-24490




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r193898812
  
--- Diff: R/pkg/NAMESPACE ---
@@ -281,6 +281,8 @@ exportMethods("%<=>%",
   "initcap",
   "input_file_name",
   "instr",
+  "isInf",
+  "isinf",
--- End diff --

Do you know if there's any consistency behind the different capitalization 
schemes? There's `format_number`, `isnan` and `isNotNull` here, for example. 

If not, how about we just go with `isInf` for now and if other aliases are 
needed in the future they can be added and discussed then?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21504
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91532/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21504
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21504
  
**[Test build #91532 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91532/testReport)**
 for PR 21504 at commit 
[`f721ebe`](https://github.com/apache/spark/commit/f721ebe170f66a9f398693b007764034a6307dce).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21483: [SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule...

2018-06-07 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/21483
  
Looks good to me, pending the above comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21482
  
@rxin, that in itself is a bit weird, but there are ways to express inf 
values in Scala and thus inf values can show up flowing through Spark plans. 
I'm not sure MySQL has any such facility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21507: Branch 1.6

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21507
  
@deepaksonu  Would it be possible to close this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20823: [SPARK-23674] Add Spark ML Listener for Tracking ML Pipe...

2018-06-07 Thread merlintang
Github user merlintang commented on the issue:

https://github.com/apache/spark/pull/20823
  
@jmwdpk can you update this pr, since there is conflict. I have update this 
pr. https://github.com/merlintang/spark/commits/SPARK-23674


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression

2018-06-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21155#discussion_r193895158
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1468,3 +1472,388 @@ case class Flatten(child: Expression) extends 
UnaryExpression {
 
   override def prettyName: String = "flatten"
 }
+
+@ExpressionDescription(
+  usage = """
+_FUNC_(start, stop, step) - Generates an array of elements from start 
to stop (inclusive),
+  incrementing by step. The type of the returned elements is the same 
as the type of argument
+  expressions.
+
+  Supported types are: byte, short, integer, long, date, timestamp.
+
+  The start and stop expressions must resolve to the same type.
+  If start and stop expressions resolve to the 'date' or 'timestamp' 
type
+  then the step expression must resolve to the 'interval' type, 
otherwise to the same type
+  as the start and stop expressions.
+  """,
+  arguments = """
+Arguments:
+  * start - an expression. The start of the range.
+  * stop - an expression. The end the range (inclusive).
+  * step - an optional expression. The step of the range.
+  By default step is 1 if start is less than or equal to stop, 
otherwise -1.
+  For the temporal sequences it's 1 day and -1 day respectively.
+  If start is greater than stop then the step must be negative, 
and vice versa.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(1, 5);
+   [1, 2, 3, 4, 5]
+  > SELECT _FUNC_(5, 1);
+   [5, 4, 3, 2, 1]
+  > SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), 
interval 1 month);
+   [2018-01-01, 2018-02-01, 2018-03-01]
+  """,
+  since = "2.4.0"
+)
+case class Sequence(
+start: Expression,
+stop: Expression,
+stepOpt: Option[Expression],
+timeZoneId: Option[String] = None)
+  extends Expression
+  with ExpectsInputTypes
+  with TimeZoneAwareExpression {
+
+  import Sequence._
+
+  def this(start: Expression, stop: Expression) =
+this(start, stop, None, None)
+
+  def this(start: Expression, stop: Expression, step: Expression) =
+this(start, stop, Some(step), None)
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+copy(timeZoneId = Some(timeZoneId))
+
+  override def children: Seq[Expression] = Seq(start, stop) ++ stepOpt
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override lazy val dataType: ArrayType = ArrayType(start.dataType, 
containsNull = false)
+
+  override def inputTypes: Seq[AbstractDataType] = {
+val elemType = dataType.elementType
+Seq(elemType, elemType) ++
+  stepOpt.map(_ => elemType match {
+case DateType | TimestampType => CalendarIntervalType
+case _: IntegralType => elemType
--- End diff --

I'm wondering what was wrong with `checkInputDataTypes`, but I can't see 
the difference because the commit was just overwritten.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
One thing you may want to be aware is that in point of executor's view, 
executor must load at least 1 version of state in memory regardless of caching 
versions. I guess you may get better result if you unload entire cache but 
leaving the last version you just committed. Cache miss will occur for one of 
three cases `2. committed but batch failed afterwards` but it will happen 
rarely and still better than cache miss from two of three cases (2 and 3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193892652
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
+file.write("%s\n" % str(event))
+file.close()
+
+def stop_all(self):
+for q in self.spark._wrapped.streams.active:
+q.stop()
+
+def __getstate__(self):
+return (self.open_events_dir, self.process_events_dir, 
self.close_events_dir)
+
+def __setstate__(self, state):
+self.open_events_dir, self.process_events_dir, 
self.close_events_dir = state
+
+def test_streaming_foreach_with_simple_function(self):
+tester = self.ForeachWriterTester(self.spark)
+
+def foreach_func(row):
+tester.write_process_event(row)
+
+tester.run_streaming_query_on_writer(foreach_func, 2)
+self.assertEqual(len(tester.process_events()), 2)
+
+def test_streaming_foreach_with_basic_open_process_close(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return True
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+tester.write_close_event(error)
+
+tester.run_streaming_query_on_writer(ForeachWriter(), 2)
+
+open_events = tester.open_events()
+self.assertEqual(len(open_events), 2)
+self.assertSetEqual(set([e['epoch'] for e in open_events]), {0, 1})
+
+self.assertEqual(len(tester.process_events()), 2)
+
+close_events = tester.close_events()
+self.assertEqual(len(close_events), 2)
+self.assertSetEqual(set([e['error'] for e in close_events]), 
{'None'})
+
+def test_streaming_foreach_with_open_returning_false(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return False
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193892565
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193892571
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193892514
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python._
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{NextIterator, Utils}
+
+class PythonForeachWriter(func: PythonFunction, schema: StructType)
+  extends ForeachWriter[UnsafeRow] {
+
+  private lazy val context = TaskContext.get()
+  private lazy val buffer = new PythonForeachWriter.UnsafeRowBuffer(
+context.taskMemoryManager, new 
File(Utils.getLocalDir(SparkEnv.get.conf)), schema.fields.length)
+  private lazy val inputRowIterator = buffer.iterator
+
+  private lazy val inputByteIterator = {
+EvaluatePython.registerPicklers()
+val objIterator = inputRowIterator.map { row => 
EvaluatePython.toJava(row, schema) }
+new SerDeUtil.AutoBatchedPickler(objIterator)
+  }
+
+  private lazy val pythonRunner = {
+val conf = SparkEnv.get.conf
+val bufferSize = conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
+PythonRunner(func, bufferSize, reuseWorker)
+  }
+
+  private lazy val outputIterator =
+pythonRunner.compute(inputByteIterator, context.partitionId(), context)
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+outputIterator  // initialize everything
+TaskContext.get.addTaskCompletionListener { _ => buffer.close() }
+true
+  }
+
+  override def process(value: UnsafeRow): Unit = {
+buffer.add(value)
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+buffer.allRowsAdded()
+if (outputIterator.hasNext) outputIterator.next() // to throw python 
exception if there was one
+  }
+}
+
+object PythonForeachWriter {
+
+  /**
+   * A buffer that is designed for the sole purpose of buffering 
UnsafeRows in PythonForeahWriter.
--- End diff --

done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193892217
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -71,23 +110,17 @@ abstract class ForeachWriter[T] extends Serializable {
   // TODO: Move this to org.apache.spark.sql.util or consolidate this with 
batch API.
 
   /**
-   * Called when starting to process one partition of new data in the 
executor. The `version` is
-   * for data deduplication when there are failures. When recovering from 
a failure, some data may
-   * be generated multiple times but they will always have the same 
version.
-   *
-   * If this method finds using the `partitionId` and `version` that this 
partition has already been
-   * processed, it can return `false` to skip the further data processing. 
However, `close` still
-   * will be called for cleaning up resources.
+   * Called when starting to process one partition of new data in the 
executor.
*
* @param partitionId the partition id.
-   * @param version a unique id for data deduplication.
+   * @param epochId a unique id for data deduplication.
* @return `true` if the corresponding partition and version id should 
be processed. `false`
* indicates the partition should be skipped.
*/
-  def open(partitionId: Long, version: Long): Boolean
+  def open(partitionId: Long, epochId: Long): Boolean
--- End diff --

okay. I checked but there is no compatibility issue in this case. The 
source compatibility issue arises when a code calls a method with param name 
(e.g. `func(paramName=value)`) and the param name changes. In this case, users 
are overriding the method (overriding does not care about the exact name, only 
the types) and Spark is internally calling the method (not by param name). So 
this is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21509: [SPARK-24489]Check for invalid input type of weight data...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21509
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21509: [24489]Check for invalid input type of weight data in ml...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21509
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21509: Check for invalid input type of weight data in ml...

2018-06-07 Thread shahidki31
GitHub user shahidki31 opened a pull request:

https://github.com/apache/spark/pull/21509

Check for invalid input type of weight data in ml.PowerIterationClustering

## What changes were proposed in this pull request?
The test case will result the following failure. currently in ml.PIC, there 
is no check for the data type of weight column. 
 ```
 test("invalid input types for weight") {
val invalidWeightData = spark.createDataFrame(Seq(
  (0L, 1L, "a"),
  (2L, 3L, "b")
)).toDF("src", "dst", "weight")

val pic = new PowerIterationClustering()
  .setWeightCol("weight")

val result = pic.assignClusters(invalidWeightData)
  }
```
```
Job aborted due to stage failure: Task 0 in stage 8077.0 failed 1 times, 
most recent failure: Lost task 0.0 in stage 8077.0 (TID 882, localhost, 
executor driver): scala.MatchError: [0,1,null] (of class 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at 
org.apache.spark.ml.clustering.PowerIterationClustering$$anonfun$3.apply(PowerIterationClustering.scala:178)
at 
org.apache.spark.ml.clustering.PowerIterationClustering$$anonfun$3.apply(PowerIterationClustering.scala:178)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
```
In this PR, added check types for weight column.
## How was this patch tested?
UT added

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shahidki31/spark testCasePic

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21509.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21509


commit 0d6d7be494b6b331a09d91b15a29bac98eac4c74
Author: Shahid 
Date:   2018-06-07T20:58:24Z

Example code for Power Iteration Clustering




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193887258
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193887147
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -71,23 +110,17 @@ abstract class ForeachWriter[T] extends Serializable {
   // TODO: Move this to org.apache.spark.sql.util or consolidate this with 
batch API.
 
   /**
-   * Called when starting to process one partition of new data in the 
executor. The `version` is
-   * for data deduplication when there are failures. When recovering from 
a failure, some data may
-   * be generated multiple times but they will always have the same 
version.
-   *
-   * If this method finds using the `partitionId` and `version` that this 
partition has already been
-   * processed, it can return `false` to skip the further data processing. 
However, `close` still
-   * will be called for cleaning up resources.
+   * Called when starting to process one partition of new data in the 
executor.
*
* @param partitionId the partition id.
-   * @param version a unique id for data deduplication.
+   * @param epochId a unique id for data deduplication.
* @return `true` if the corresponding partition and version id should 
be processed. `false`
* indicates the partition should be skipped.
*/
-  def open(partitionId: Long, version: Long): Boolean
+  def open(partitionId: Long, epochId: Long): Boolean
--- End diff --

oh right... it does break source compatibility. I dont think that is fine 
:( especially in the 2.x line, even though it is marked as experimental (its 
been out there for over 2 years now, this is hardly experimental and we should 
not remove the experimental tag, but that's a different discussion).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193886200
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
--- End diff --

Python APIs anyways have slightly divergences from Scala/Java APIs in order 
to provide better experiences for Python users. For example, 
`StreamingQuery.lastProgress` returns an object of type 
`StreamingQueryProgress` in Java/Scala but returns a dict in python. Because 
python users are more used to dealing with dicts, and java/scala users (typed 
language) are more comfortable with structures). Similarly, in 
DataFrame.select, you can refer to columns in scala using `$"columnName"` but 
in python, you can refer to it as `df.columnName`. If we strictly adhere to 
pure consistency, then we cannot make it convenient for users in different 
languages. And ultimately convenience is what matters for the user experience. 
So its okay to have a superset of supported types in python compared to 
java/scala. 

Personally, I think we should also add the lambda variant to Scala as well. 
But that decision for Scala is independent of this PR as there is enough 
justification for add the lambda variant for 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
You can also merge #21506 (maybe with changing log level or modify the 
patch to set message to INFO level) and see latencies on loading state, 
snapshotting, cleaning up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21469
  
**[Test build #91535 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91535/testReport)**
 for PR 21469 at commit 
[`3c80cad`](https://github.com/apache/spark/commit/3c80cad32c056a24a7f5ffd7ab0ae3f7e096a62d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193878552
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... def 

[GitHub] spark issue #21283: [SPARK-24224][ML-Examples]Java example code for Power It...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21283
  
**[Test build #4195 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4195/testReport)**
 for PR 21283 at commit 
[`90450e0`](https://github.com/apache/spark/commit/90450e0950b334af4373c247fe3b0ed0cc6fe6c0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21248: [SPARK-24191][ML]Scala Example code for Power Iteration ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21248
  
**[Test build #4196 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4196/testReport)**
 for PR 21248 at commit 
[`d005493`](https://github.com/apache/spark/commit/d0054939562ff822c2a0581af2a616f61a82e131).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21508: [SPARK-24488] [SQL] Fix issue when generator is aliased ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21508
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21508: [SPARK-24488] [SQL] Fix issue when generator is aliased ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21508
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21508: [SPARK-24488] [SQL] Fix issue when generator is a...

2018-06-07 Thread bkrieger
GitHub user bkrieger opened a pull request:

https://github.com/apache/spark/pull/21508

[SPARK-24488] [SQL] Fix issue when generator is aliased multiple times

## What changes were proposed in this pull request?

Currently, the Analyzer throws an exception if your try to nest a 
generator. However, it special cases generators "nested" in an alias, and 
allows that. If you try to alias a generator twice, it is not caught by the 
special case, so an exception is thrown.

This PR trims the unnecessary, non-top-level aliases, so that the generator 
is allowed.


## How was this patch tested?

new tests in AnalysisSuite.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bkrieger/spark bk/SPARK-24488

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21508.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21508


commit 44ae34d0387f763936cddeceae64ee98b7bb279f
Author: Brandon Krieger 
Date:   2018-06-07T20:09:09Z

SPARK-24488




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21248: [SPARK-24191][ML]Scala Example code for Power Iteration ...

2018-06-07 Thread shahidki31
Github user shahidki31 commented on the issue:

https://github.com/apache/spark/pull/21248
  
Hi @hhbyyh  @srowen  ,
PowerIterationClustering API has some modifications based on the discussion 
in the JIRA, https://issues.apache.org/jira/browse/SPARK-15784.
The examples also have modified based on the latest PR of Power Iteration 
Clustering, https://github.com/apache/spark/pull/21493

Can you please review the modified changes?
Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21248: [SPARK-24191][ML]Scala Example code for Power Iteration ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21248
  
**[Test build #4196 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4196/testReport)**
 for PR 21248 at commit 
[`d005493`](https://github.com/apache/spark/commit/d0054939562ff822c2a0581af2a616f61a82e131).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21248: [SPARK-24191][ML]Scala Example code for Power Iteration ...

2018-06-07 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/21248
  
OK, I think this could have been combined with 
https://github.com/apache/spark/pull/21283, but whatever.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21283: [SPARK-24224][ML-Examples]Java example code for Power It...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21283
  
**[Test build #4195 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4195/testReport)**
 for PR 21283 at commit 
[`90450e0`](https://github.com/apache/spark/commit/90450e0950b334af4373c247fe3b0ed0cc6fe6c0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21502
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91528/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21502
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21502
  
**[Test build #91528 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91528/testReport)**
 for PR 21502 at commit 
[`4d080cf`](https://github.com/apache/spark/commit/4d080cff795457c6a02b255acc691157afc94e81).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21283: [SPARK-24224][ML-Examples]Java example code for Power It...

2018-06-07 Thread shahidki31
Github user shahidki31 commented on the issue:

https://github.com/apache/spark/pull/21283
  
Hi @srowen , Thanks for the comment. I have modified the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread aalobaidi
Github user aalobaidi commented on the issue:

https://github.com/apache/spark/pull/21500
  
Sorry for the late reply. The option is useful for specific use case which 
is micro-batches with relatively large number partitions with each of the 
partitions is very big in size. When this option is enabled, Spark will load 
the state of a partition from disk, process all events belonging to the 
partition and then commit the new state (delta) to disk and unloaded the entire 
partition state from memory. And go to the next partition(task). This way each 
executor will keep in memory the state of the partitions running concurrently 
as opposite to keeping all the state of all partitions executed.

You can control the balance between memory usage and IOs by setting 
`spark.sql.shuffle.partitions` (should be set before the first run of the 
query).

I did JVM profiling and benchmarks with 5M events micro-batchs of total 
state of ~600M key 6 nodes EMR cluster. The memory usage was much better (in 
fact the default behavior failed with less than 200M key) and performance 
wasn't affected significantly. (I will have to compile more specific numbers).

@HeartSaVioR brings a good point regarding state compaction (snapshots). I 
can’t confirm if compactions was working or not during the test, I will have 
to get back to you guys about this.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21506
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91525/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21506
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21506
  
**[Test build #91525 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91525/testReport)**
 for PR 21506 at commit 
[`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193839997
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -71,23 +110,17 @@ abstract class ForeachWriter[T] extends Serializable {
   // TODO: Move this to org.apache.spark.sql.util or consolidate this with 
batch API.
 
   /**
-   * Called when starting to process one partition of new data in the 
executor. The `version` is
-   * for data deduplication when there are failures. When recovering from 
a failure, some data may
-   * be generated multiple times but they will always have the same 
version.
-   *
-   * If this method finds using the `partitionId` and `version` that this 
partition has already been
-   * processed, it can return `false` to skip the further data processing. 
However, `close` still
-   * will be called for cleaning up resources.
+   * Called when starting to process one partition of new data in the 
executor.
*
* @param partitionId the partition id.
-   * @param version a unique id for data deduplication.
+   * @param epochId a unique id for data deduplication.
* @return `true` if the corresponding partition and version id should 
be processed. `false`
* indicates the partition should be skipped.
*/
-  def open(partitionId: Long, version: Long): Boolean
+  def open(partitionId: Long, epochId: Long): Boolean
--- End diff --

Renaming a parameter breaks Scala source compatibility. I'm totally fine to 
change this since it's not a stable API, just point this out.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   >