[GitHub] spark issue #20400: [SPARK-23084][PYTHON]Add unboundedPreceding(), unbounded...

2018-01-30 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20400
  
LGTM only one nit


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20400: [SPARK-23084][PYTHON]Add unboundedPreceding(), un...

2018-01-30 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20400#discussion_r164950347
  
--- Diff: python/pyspark/sql/window.py ---
@@ -124,16 +126,20 @@ def rangeBetween(start, end):
 values directly.
 
 :param start: boundary start, inclusive.
-  The frame is unbounded if this is 
``Window.unboundedPreceding``, or
+  The frame is unbounded if this is 
``Window.unboundedPreceding``,
+  
``org.apache.spark.sql.catalyst.expressions.UnboundedPreceding``, or
   any value less than or equal to max(-sys.maxsize, 
-9223372036854775808).
 :param end: boundary end, inclusive.
-The frame is unbounded if this is 
``Window.unboundedFollowing``, or
+The frame is unbounded if this is 
``Window.unboundedFollowing``,
+
``org.apache.spark.sql.catalyst.expressions.UnboundedFollowing``, or
 any value greater than or equal to min(sys.maxsize, 
9223372036854775807).
--- End diff --

remove this line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20445
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20445
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86857/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20445
  
**[Test build #86857 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86857/testReport)**
 for PR 20445 at commit 
[`5adf1fe`](https://github.com/apache/spark/commit/5adf1fe7966f0668f2c2f5b702f5f9a45e72443d).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20400: [SPARK-23084][PYTHON]Add unboundedPreceding(), unbounded...

2018-01-30 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20400
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20443: [SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkDataFram...

2018-01-30 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20443
  
cc @felixcheung Could you take a look at this? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter w...

2018-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20444


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter when the...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20444
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86851/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter when the...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20444
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter when the...

2018-01-30 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20444
  
Thanks! Merged to master/2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter when the...

2018-01-30 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20444
  
Yeah, I see. LGTM.

On Wed, Jan 31, 2018, 1:03 PM Xiao Li  wrote:

> *@gatorsmile* commented on this pull request.
> --
>
> In
> 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
> :
>
> >  val filterCondition =
>
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
>
>  val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
>
> -filterCondition.transform { case a : AttributeReference => 
attributeNameMap(a.name) }
> +if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
> +  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
>
> Yes. There are multiple potential cases we can improve for this case. If
> we make it more complicated, it just takes a longer time to review the
> work. This blocks the 2.3 RC. Thus, I would like to fix it in a
> conservative way.
>
> —
> You are receiving this because you commented.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20440: [SPARK-23276][SQL][TEST] Enable UDT tests in (Hive)OrcHa...

2018-01-30 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/20440
  
Thank you, @HyukjinKwon  and @gatorsmile !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter when the...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20444
  
**[Test build #86851 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86851/testReport)**
 for PR 20444 at commit 
[`41bd3e0`](https://github.com/apache/spark/commit/41bd3e00d1c31a36cd4565d41dc8e28165e5afe6).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter w...

2018-01-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20444#discussion_r164948893
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 ---
@@ -46,18 +46,27 @@ object ReplaceExceptWithFilter extends 
Rule[LogicalPlan] {
 }
 
 plan.transform {
-  case Except(left, right) if isEligible(left, right) =>
-Distinct(Filter(Not(transformCondition(left, skipProject(right))), 
left))
+  case e @ Except(left, right) if isEligible(left, right) =>
+val newCondition = transformCondition(left, skipProject(right))
+newCondition.map { c =>
+  Distinct(Filter(Not(c), left))
+}.getOrElse {
+  e
+}
 }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Expression = {
+  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
 val filterCondition =
   
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
 
 val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
 
-filterCondition.transform { case a : AttributeReference => 
attributeNameMap(a.name) }
+if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
+  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
--- End diff --

Yes. There are multiple potential cases we can improve for this case. If we 
make it more complicated, it just takes a longer time to review the work. This 
blocks the 2.3 RC. Thus, I would like to fix it in a conservative way. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20226: [SPARK-23034][SQL] Override `nodeName` for all *ScanExec...

2018-01-30 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20226
  
It sounds like we still need to fix a test in PySpark. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20442: [SPARK-23265][SQL]Update multi-column error handl...

2018-01-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20442#discussion_r164945559
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -167,25 +167,31 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.3.0")
   def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
 
-  private[feature] def getInOutCols: (Array[String], Array[String]) = {
-require((isSet(inputCol) && isSet(outputCol) && !isSet(inputCols) && 
!isSet(outputCols)) ||
-  (!isSet(inputCol) && !isSet(outputCol) && isSet(inputCols) && 
isSet(outputCols)),
-  "QuantileDiscretizer only supports setting either inputCol/outputCol 
or" +
-"inputCols/outputCols."
-)
+  @Since("1.6.0")
+  override def transformSchema(schema: StructType): StructType = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol),
+  Seq(outputCols))
 
-if (isSet(inputCol)) {
-  (Array($(inputCol)), Array($(outputCol)))
-} else {
-  require($(inputCols).length == $(outputCols).length,
-"inputCols number do not match outputCols")
-  ($(inputCols), $(outputCols))
+if (isSet(inputCols)) {
+  require(getInputCols.length == getOutputCols.length,
+s"QuantileDiscretizer $this has mismatched Params " +
--- End diff --

Do `this` need to be in output? Or just `The QuantileDiscretizer has ...`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20442: [SPARK-23265][SQL]Update multi-column error handl...

2018-01-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20442#discussion_r164939903
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -167,25 +167,31 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.3.0")
   def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
 
-  private[feature] def getInOutCols: (Array[String], Array[String]) = {
-require((isSet(inputCol) && isSet(outputCol) && !isSet(inputCols) && 
!isSet(outputCols)) ||
-  (!isSet(inputCol) && !isSet(outputCol) && isSet(inputCols) && 
isSet(outputCols)),
-  "QuantileDiscretizer only supports setting either inputCol/outputCol 
or" +
-"inputCols/outputCols."
-)
+  @Since("1.6.0")
+  override def transformSchema(schema: StructType): StructType = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol),
--- End diff --

Setting `numBucketsArray` when single-column can be an error. Since 
`checkSingleVsMultiColumnParams` doesn't support this usage, I think we may 
need to check it here. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

2018-01-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20382
  
Sure, will waiting for others to be merged, thanks @tdas .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20447: [SPARK-23279][SS] Avoid triggering distributed job for C...

2018-01-30 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20447
  
Thank you very much for fixing this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

2018-01-30 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20382
  
I am holding off further comments on this PR until the major change of 
eliminating v1 Source is done. That would cause significant refactoring 
(including the fact that the common trait wont be needed). 

BTW, I strongly suggest moving the socket code to 
execution.streaming.sources, like other v2 sources.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164944650
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offsetDiff 

[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20438#discussion_r164944493
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
 ---
@@ -146,9 +146,7 @@ public UTF8String getUTF8String(int ordinal) {
   @Override
   public CalendarInterval getInterval(int ordinal) {
 if (columns[ordinal].isNullAt(rowId)) return null;
--- End diff --

see https://github.com/apache/spark/pull/20438#discussion_r164719793 . In 
this PR I just fixed the returning null issue for `getStruct` and 
`getInterval`, because they are non-abstract. There should be a follow up to 
clearly document that `ColumnVector.getBinary/getUTF8String/...` should return 
null if this slot is null. Then we can remove these null checks here. I 
appreciate it if you have time to take this :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164944362
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TextSocketReader.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.util.Calendar
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.internal.Logging
+
+trait TextSocketReader extends Logging {
--- End diff --

Please add docs!! This is a base interface used by two source 
implementations 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164944324
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
--- End diff --

Please update docs accordingly!! This is not a source, but a base interface 
used by two source implementations 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164944276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offset

[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...

2018-01-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20438#discussion_r164943981
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java ---
@@ -139,9 +139,7 @@ public UTF8String getUTF8String(int ordinal) {
   @Override
   public CalendarInterval getInterval(int ordinal) {
 if (data.getChild(ordinal).isNullAt(rowId)) return null;
--- End diff --

Can we remove this null check now?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...

2018-01-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20438#discussion_r164943754
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
 ---
@@ -146,9 +146,7 @@ public UTF8String getUTF8String(int ordinal) {
   @Override
   public CalendarInterval getInterval(int ordinal) {
 if (columns[ordinal].isNullAt(rowId)) return null;
--- End diff --

Do we still need this null check?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164943885
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offs

[GitHub] spark issue #20438: [SPARK-23272][SQL] add calendar interval type support to...

2018-01-30 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/20438
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20435: [SPARK-23268][SQL]Reorganize packages in data sou...

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20435#discussion_r164943009
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 ---
@@ -20,14 +20,16 @@ package org.apache.spark.sql.kafka010
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
-import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => 
OffsetV2, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
--- End diff --

can we keep it same as before?
```
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => 
OffsetV2, PartitionOffset}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20438#discussion_r164942783
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java ---
@@ -236,9 +238,29 @@ public MapData getMap(int ordinal) {
   public abstract byte[] getBinary(int rowId);
 
   /**
-   * Returns the ordinal's child column vector.
+   * Returns the calendar interval type value for rowId.
+   *
+   * In Spark, calendar interval type value is basically an integer value 
representing the number of
+   * months in this interval, and a long value representing the number of 
microseconds in this
+   * interval. An interval type vector is the same as a struct type vector 
with 2 fields: `months`
+   * and `microseconds`.
+   *
+   * To support interval type, implementations must implement {@link 
#getChild(int)} and define 2
+   * child vectors: the first child vector is an int type vector, 
containing all the month values of
+   * all the interval values in this vector. The second child vector is a 
long type vector,
+   * containing all the microsecond values of all the interval values in 
this vector.
+   */
+  public final CalendarInterval getInterval(int rowId) {
+if (isNullAt(rowId)) return null;
+final int months = getChild(0).getInt(rowId);
+final long microseconds = getChild(1).getLong(rowId);
+return new CalendarInterval(months, microseconds);
+  }
+
+  /**
+   * @return child [[ColumnVector]] at the given ordinal.
*/
-  public abstract ColumnVector getChild(int ordinal);
+  protected abstract ColumnVector getChild(int ordinal);
--- End diff --

added


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20415
  
**[Test build #86860 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86860/testReport)**
 for PR 20415 at commit 
[`e3e09d9`](https://github.com/apache/spark/commit/e3e09d98072bd39328a4e7d4de1ddd38594c6232).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20415
  
looks like a reasonable change to me. Although I don't think this will have 
some significant performance improvement, it makes the code more compact.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20415
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17280: [SPARK-19939] [ML] Add support for association ru...

2018-01-30 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/17280#discussion_r164942458
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala ---
@@ -319,9 +323,11 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] 
{
 
 override def load(path: String): FPGrowthModel = {
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+  implicit val format = DefaultFormats
+  val numTrainingRecords = (metadata.metadata \ 
"numTrainingRecords").extract[Long]
--- End diff --

Since we're adding numTrainingRecords to FPGrowthModel and there isn't a 
proper default number, I suggest we break the backward model loading 
compatibility. Otherwise we need to fill numTrainingRecords with an incorrect 
value and will likely create a maintenance trap.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20433: [SPARK-23264][SQL] Support interval values without INTER...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20433
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86849/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20433: [SPARK-23264][SQL] Support interval values without INTER...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20433
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20226: [SPARK-23034][SQL] Override `nodeName` for all *ScanExec...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20226
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86850/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20226: [SPARK-23034][SQL] Override `nodeName` for all *ScanExec...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20226
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20433: [SPARK-23264][SQL] Support interval values without INTER...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20433
  
**[Test build #86849 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86849/testReport)**
 for PR 20433 at commit 
[`4173ff0`](https://github.com/apache/spark/commit/4173ff05e693e2dd0080422ece32b65f30a81385).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20226: [SPARK-23034][SQL] Override `nodeName` for all *ScanExec...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20226
  
**[Test build #86850 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86850/testReport)**
 for PR 20226 at commit 
[`1facc05`](https://github.com/apache/spark/commit/1facc0554aae0829a19bbb7607b25ff7eda4ef8d).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter when the...

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20444
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19340: [SPARK-22119][ML] Add cosine distance to KMeans

2018-01-30 Thread zhengruifeng
Github user zhengruifeng commented on the issue:

https://github.com/apache/spark/pull/19340
  
The updating of centers should be viewed as the **M-step** in EM algorithm, 
in which some objective is optimized. 

Since cosine similarity do not take vector-norm into account: 
1. the optimal solution of normized points (`V`) should also be optimal to 
original points
2. scaled solution (`k*V, k>0`) is also optimal to both normized points and 
original points

If we want to optimize intra-cluster cosine similarity (like Matlab), then 
arithmetic mean of normized points should be a better solution than arithmetic 
mean of original points.

Suppose two 2D points (x=0,y=1) and (x=100,y=0):

1. If we choose the arithmetic mean (x=50,y=0.5) as the center,  the sum of 
cosine similarity is about 1.0;
2. If we choose the arithmetic mean of normized points (x=0.5,y=0.5), the 
sum of cosine similarity is about 1.414;
3. this center can then be normized for computation convenience in 
following assignment (E-step) or prediction.

Since `VectorWithNorm` is used as the input, norms of vectors are already 
computed, then I think we only need to update [this 
line](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L314)
 to 
```
if (point.norm > 0) {
  axpy(1.0 / point.norm, point.vector, sum)
}
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20447: [SPARK-23279][SS] Avoid triggering distributed job for C...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20447
  
**[Test build #86859 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86859/testReport)**
 for PR 20447 at commit 
[`4b2baeb`](https://github.com/apache/spark/commit/4b2baeb8e00d575189036609c232a9c24f69e4a0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20447: [SPARK-23279][SS] Avoid triggering distributed job for C...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20447
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20447: [SPARK-23279][SS] Avoid triggering distributed job for C...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20447
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/415/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20447: [SPARK-23279][SS] Avoid triggering distributed job for C...

2018-01-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20447
  
CC @tdas , please help to review. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20447: [SPARK-23279][SS] Avoid triggering distributed jo...

2018-01-30 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/20447

[SPARK-23279][SS] Avoid triggering distributed job for Console sink

## What changes were proposed in this pull request?

Console sink will redistribute collected local data and trigger a 
distributed job in each batch, this is not necessary, so here change to local 
job.

## How was this patch tested?

Existing UT and manual verification.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark console-minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20447.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20447


commit 4b2baeb8e00d575189036609c232a9c24f69e4a0
Author: jerryshao 
Date:   2018-01-31T02:29:01Z

Avoid triggering distributed job for Console sink




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streami...

2018-01-30 Thread guoxiaolongzte
Github user guoxiaolongzte commented on the issue:

https://github.com/apache/spark/pull/20437
  
thanks, Thank you for your review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20386
  
`FileCommitProtocol.onTaskCommit` is called in `FileFormatWriter.write`, so 
this PR is required to migrate file-based data sources.

By a quick look, it seems `FileCommitProtocol.onTaskCommit` doesn't have an 
implementation yet, but I don't want to change the existing API and I assume 
this API is necessary. BTW it sounds reasonable to provide a callback for task 
commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164937540
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offsetDiff 

[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20408
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86846/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20408
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20422
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20422
  
**[Test build #86858 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86858/testReport)**
 for PR 20422 at commit 
[`87e6dc0`](https://github.com/apache/spark/commit/87e6dc0b9ce362e754142c63b95a1841f427471a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20378: [SPARK-11222][Build][Python] Python document style check...

2018-01-30 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20378
  
pydocstyle seems claiming PEP 257 - 
https://www.python.org/dev/peps/pep-0257.

One option given 
https://github.com/apache/spark/pull/20378#issuecomment-361494109 and 
https://github.com/apache/spark/pull/20378#issuecomment-361523561 might be to 
note that we follow PEP 257 in http://spark.apache.org/contributing.html and 
then enable only ones causing actual problems.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20422
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/414/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20445
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/413/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20444: [SPARK-23274] [SQL] Fix ReplaceExceptWithFilter w...

2018-01-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20444#discussion_r164937374
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 ---
@@ -46,18 +46,27 @@ object ReplaceExceptWithFilter extends 
Rule[LogicalPlan] {
 }
 
 plan.transform {
-  case Except(left, right) if isEligible(left, right) =>
-Distinct(Filter(Not(transformCondition(left, skipProject(right))), 
left))
+  case e @ Except(left, right) if isEligible(left, right) =>
+val newCondition = transformCondition(left, skipProject(right))
+newCondition.map { c =>
+  Distinct(Filter(Not(c), left))
+}.getOrElse {
+  e
+}
 }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Expression = {
+  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
 val filterCondition =
   
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
 
 val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
 
-filterCondition.transform { case a : AttributeReference => 
attributeNameMap(a.name) }
+if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
+  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
--- End diff --

Actually it may still possibly to add the `Filter` on the child of left's 
projection where it can be applied. But for now this fixing LGTM.

We may also need to update the doc of `ReplaceExceptWithFilter` to add this 
constraint.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20445
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20408
  
**[Test build #86846 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86846/testReport)**
 for PR 20408 at commit 
[`e674614`](https://github.com/apache/spark/commit/e674614ea67f79a15a187d86857059778897ea9c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20445
  
**[Test build #86857 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86857/testReport)**
 for PR 20445 at commit 
[`5adf1fe`](https://github.com/apache/spark/commit/5adf1fe7966f0668f2c2f5b702f5f9a45e72443d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20446: [SPARK-23254][ML] Add user guide entry for DataFrame mul...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20446
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86856/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20446: [SPARK-23254][ML] Add user guide entry for DataFrame mul...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20446
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20446: [SPARK-23254][ML] Add user guide entry for DataFrame mul...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20446
  
**[Test build #86856 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86856/testReport)**
 for PR 20446 at commit 
[`307f75f`](https://github.com/apache/spark/commit/307f75f4990049f78978364af4541cd20e4d5bd7).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `public class JavaSummarizerExample `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164934753
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 ---
@@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging {
 println("---")
 // scalastyle:off println
 spark
-  .createDataFrame(spark.sparkContext.parallelize(rows), schema)
+  .createDataFrame(rows.toList.asJava, schema)
--- End diff --

OK, I will create a separate PR for this small fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20373: [SPARK-23159][PYTHON] Update cloudpickle to v0.4.2 plus ...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20373
  
**[Test build #86852 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86852/testReport)**
 for PR 20373 at commit 
[`4d7047a`](https://github.com/apache/spark/commit/4d7047a3b355e306e7baf66527b615559c8cd204).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20373: [SPARK-23159][PYTHON] Update cloudpickle to v0.4.2 plus ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20373
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86852/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20373: [SPARK-23159][PYTHON] Update cloudpickle to v0.4.2 plus ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20373
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20446: [SPARK-23254][ML] Add user guide entry for DataFrame mul...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20446
  
**[Test build #86856 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86856/testReport)**
 for PR 20446 at commit 
[`307f75f`](https://github.com/apache/spark/commit/307f75f4990049f78978364af4541cd20e4d5bd7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20446: [SPARK-23254][ML] Add user guide entry for DataFrame mul...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20446
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/412/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20446: [SPARK-23254][ML] Add user guide entry for DataFrame mul...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20446
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20446: [SPARK-23254][ML] Add user guide entry for DataFrame mul...

2018-01-30 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/20446
  
@MLnick @MrBago Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20446: [SPARK-23254][ML] Add user guide entry for DataFr...

2018-01-30 Thread WeichenXu123
GitHub user WeichenXu123 opened a pull request:

https://github.com/apache/spark/pull/20446

[SPARK-23254][ML] Add user guide entry for DataFrame multivariate summary

## What changes were proposed in this pull request?

Add user guide and scala/java examples for `ml.stat.Summarizer`

## How was this patch tested?

Doc generated snapshot:


![image](https://user-images.githubusercontent.com/19235986/35600897-2bb9c102-05e5-11e8-849f-e327f125.png)

![image](https://user-images.githubusercontent.com/19235986/35600910-3b022f28-05e5-11e8-823e-ae61009317a0.png)

![image](https://user-images.githubusercontent.com/19235986/35600918-43c24f3a-05e5-11e8-847d-446452838e05.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/WeichenXu123/spark summ_guide

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20446.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20446


commit 307f75f4990049f78978364af4541cd20e4d5bd7
Author: WeichenXu 
Date:   2018-01-31T01:41:48Z

init pr




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20445
  
**[Test build #86855 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86855/testReport)**
 for PR 20445 at commit 
[`e66d809`](https://github.com/apache/spark/commit/e66d809fe501b19b923a88d1b4cb9df69b4ae329).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20445#discussion_r164933558
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
 ---
@@ -28,9 +28,9 @@ import org.apache.spark.sql.sources.v2.reader._
 trait DataSourceReaderHolder {
 
   /**
-   * The full output of the data source reader, without column pruning.
+   * The output of the data source reader, without column pruning.
*/
-  def fullOutput: Seq[AttributeReference]
--- End diff --

@cloud-fan This fixes the bug I spoke to you offline about. 
The target of this PR is only master, not 2.3. So if you want to have this 
fix in 2.3.0, please make a separate PR accordingly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20445
  
Build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/20279
  
Closed in favor of #20445


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20445
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/411/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164933597
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offset

[GitHub] spark pull request #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-01-30 Thread brkyvz
Github user brkyvz closed the pull request at:

https://github.com/apache/spark/pull/20279


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20373: [SPARK-23159][PYTHON] Update cloudpickle to v0.4.2 plus ...

2018-01-30 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20373
  
Wait .. @BryanCutler, did you port the formatting one here ..? I was 
thinking we should match it to v0.4.2 as same as possible to reduce the diff.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164933166
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should 
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for 
the same epoch
* in some circumstances.
*/
-  void commit(long epochId, WriterCommitMessage[] messages);
+  void commit(long epochId);
 
   /**
-   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
-   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
+   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
+   * or the Spark job fails with some unknown reasons,
+   * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
*
* If this method fails (by throwing an exception), the underlying data 
source may require manual
* cleanup.
*
-   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
-   * null slots as there maybe only a few data writers that are committed 
before the abort
-   * happens, or some data writers were committed but their commit 
messages haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" 
for data sources to
-   * clean up the data left by data writers.
+   * Unless the abort is triggered by the failure of commit, the number of 
commit
+   * messages added by {@link #add(WriterCommitMessage)} should be smaller 
than the number
+   * of input data partitions, as there may be only a few data writers 
that are committed
+   * before the abort happens, or some data writers were committed but 
their commit messages
+   * haven't reached the driver when the abort is triggered. So this is 
just a "best effort"
--- End diff --

I think there is no difference between "the message is created, but a node 
fails before it is sent" and "the message is in flight". Implementations need 
to deal with the case when a writer finishes successfully but its message is 
not available in `abort` anyway.

`best effort` might not be a good word, do you have a better suggestion?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-01-30 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20445

[SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs

## What changes were proposed in this pull request?

This PR migrates the MemoryStream to DataSourceV2 APIs. It fixes a few 
things along the way. 
1. Fixed bug in DataSourceV2ScanExec that prevents it from being 
canonicalized, required for some tests to pass (StreamingDeduplicateSuite)
2. Changed the reported keys in StreamingQueryProgress.durationMs. 
  - "getOffset" and "getBatch" replaced with "setOffsetRange" and 
"getEndOffset" as tracking that makese more sense. Unit tests changed 
accordingly.

## How was this patch tested?
Existing unit tests, few updated unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23092

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20445.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20445


commit 7c09b376eef6a4e6c118c78ad9459cb55e59e67f
Author: Burak Yavuz 
Date:   2018-01-11T16:44:19Z

save for so far

commit 78c50f860aa13f569669f4ad77f4325d80085c8b
Author: Burak Yavuz 
Date:   2018-01-12T18:27:49Z

Save so far

commit 2777b5b38596a1fb68bcf8ee928aec1a58dc372c
Author: Burak Yavuz 
Date:   2018-01-13T01:43:03Z

save so far

commit 50a541b5890f328a655a7ef1fca4f8480b9a35f0
Author: Burak Yavuz 
Date:   2018-01-16T19:14:08Z

Compiles and I think also runs correctly

commit fd61724c6afcab5831fe8c602ad134d0c473184b
Author: Burak Yavuz 
Date:   2018-01-16T19:25:39Z

save

commit 7a0b564bd0c74525ebcea55b31f9658b1c2f0e12
Author: Burak Yavuz 
Date:   2018-01-16T19:28:31Z

fix merge conflicts

commit a81c2ecdafd54a2c5bfb07c6f1f53546eaa96c7c
Author: Burak Yavuz 
Date:   2018-01-16T22:26:28Z

fix hive

commit 1a4f4108118d976857778916b18499b4e0bf140c
Author: Tathagata Das 
Date:   2018-01-27T01:11:01Z

Undo changes to HiveSessionStateBuilder.scala

commit 083e93c26fd2d1e8c4c738b251a27724115a0001
Author: Tathagata Das 
Date:   2018-01-27T01:11:06Z

Merge remote-tracking branch 'apache-github/master' into HEAD

commit a817c8d40e4ecaf5e4e0c46f43313c5cceeec54e
Author: Tathagata Das 
Date:   2018-01-29T22:27:22Z

Fixed the setOffsetRange bug

commit 35b8854ae466e0313ff926cc1efb8c423d3eefea
Author: Tathagata Das 
Date:   2018-01-30T20:42:56Z

Fixed DataSourceV2ScanExec canonicalization bug

commit e66d809fe501b19b923a88d1b4cb9df69b4ae329
Author: Tathagata Das 
Date:   2018-01-31T00:57:59Z

Fixed metrics reported by MicroBatchExecution




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20279
  
**[Test build #86854 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86854/testReport)**
 for PR 20279 at commit 
[`a81c2ec`](https://github.com/apache/spark/commit/a81c2ecdafd54a2c5bfb07c6f1f53546eaa96c7c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20279
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/410/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20279
  
Build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164932125
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
--- End diff --

how about `the number of data(RDD) partitions to write`?

> why it isn't obvious ...

Maybe we can just follow `FileCommitProtocol`, i.e. `commit` and `abort` 
still takes an array of messages.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20279
  
**[Test build #86853 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86853/testReport)**
 for PR 20279 at commit 
[`9f6c6b9`](https://github.com/apache/spark/commit/9f6c6b9ec5f9669a9147ce61ada90977e255ec85).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20279
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/409/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20279
  
Build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164930581
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offsetDiff 

[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164930522
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -63,32 +68,42 @@
   DataWriterFactory createWriterFactory();
 
   /**
-   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
-   * successful data writers and are produced by {@link 
DataWriter#commit()}.
+   * Handles a commit message which is collected from a successful data 
writer.
+   *
+   * Note that, implementations might need to cache all commit messages 
before calling
+   * {@link #commit()} or {@link #abort()}.
*
* If this method fails (by throwing an exception), this writing job is 
considered to to have been
-   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
-   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
+   */
+  void add(WriterCommitMessage message);
+
+  /**
+   * Commits this writing job.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
*/
-  void commit(WriterCommitMessage[] messages);
+  void commit();
--- End diff --

This is something we wanna improve at the API level. I think the 
implementation should be free to decide how to store the messages, in case each 
message is big and there are a lot of them. If this is not a problem at all, we 
can follow `FileCommitProtocol`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164930523
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 ---
@@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging {
 println("---")
 // scalastyle:off println
 spark
-  .createDataFrame(spark.sparkContext.parallelize(rows), schema)
+  .createDataFrame(rows.toList.asJava, schema)
--- End diff --

this fix should go into 2.3 branch. thanks for catching this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20373: [SPARK-23159][PYTHON] Update cloudpickle to v0.4.2 plus ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20373
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20373: [SPARK-23159][PYTHON] Update cloudpickle to v0.4.2 plus ...

2018-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20373
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/408/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20440: [SPARK-23276][SQL][TEST] Enable UDT tests in (Hiv...

2018-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20440


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20440: [SPARK-23276][SQL][TEST] Enable UDT tests in (Hive)OrcHa...

2018-01-30 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20440
  
Thanks! Merged to master/2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20373: [SPARK-23159][PYTHON] Update cloudpickle to v0.4.2 plus ...

2018-01-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20373
  
**[Test build #86852 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86852/testReport)**
 for PR 20373 at commit 
[`4d7047a`](https://github.com/apache/spark/commit/4d7047a3b355e306e7baf66527b615559c8cd204).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   >