[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91495/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21498
  
**[Test build #91495 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91495/testReport)**
 for PR 21498 at commit 
[`0dedf44`](https://github.com/apache/spark/commit/0dedf44559fb6da11c5d903c51bb73f5f508fe6f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193295239
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
--- End diff --

Seems this variant is specific to Python. I thought we should better match 
how we support with Scala side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193294891
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
--- End diff --

@tdas, wouldn't we better just have `ForeachWriter` class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193294262
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -20,10 +20,48 @@ package org.apache.spark.sql
 import org.apache.spark.annotation.InterfaceStability
 
 /**
- * A class to consume data generated by a `StreamingQuery`. Typically this 
is used to send the
- * generated data to external systems. Each partition will use a new 
deserialized instance, so you
- * usually should do all the initialization (e.g. opening a connection or 
initiating a transaction)
- * in the `open` method.
+ * The abstract class for writing custom logic to process data generated 
by a query.
+ * This is often used to write the output of a streaming query to 
arbitrary storage systems.
--- End diff --

I don't think there's a easy way to reference doc across other languages. 
It's unfortunate but usually we have duplicated the docs between other language 
API docs ... 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91496/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21498
  
**[Test build #91496 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91496/testReport)**
 for PR 21498 at commit 
[`6f487c9`](https://github.com/apache/spark/commit/6f487c9a9288176b7fcc574021abff6ae403895b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193292673
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("verify ServerThread only accepts the first connection") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
--- End diff --

I would use `SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key` BTW.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193285667
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... 

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193286066
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... 

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193286932
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
--- End diff --

We might feel more convenient with `with` statement, and renaming `file` to 
`f` or `fw` or so. Please ignore if there's specific reason not to use `with` 
statement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193284839
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
--- End diff --

> any initialization for writing data (e.g. opening a connection or 
starting a transaction) be done open after the `open(...)` method has been 
called

`be done open` seems a bit odd. If we can polish the sentence it would be 
better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193289099
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
+file.write("%s\n" % str(event))
+file.close()
+
+def stop_all(self):
+for q in self.spark._wrapped.streams.active:
+q.stop()
+
+def __getstate__(self):
+return (self.open_events_dir, self.process_events_dir, 
self.close_events_dir)
+
+def __setstate__(self, state):
+self.open_events_dir, self.process_events_dir, 
self.close_events_dir = state
+
+def test_streaming_foreach_with_simple_function(self):
+tester = self.ForeachWriterTester(self.spark)
+
+def foreach_func(row):
+tester.write_process_event(row)
+
+tester.run_streaming_query_on_writer(foreach_func, 2)
+self.assertEqual(len(tester.process_events()), 2)
+
+def test_streaming_foreach_with_basic_open_process_close(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return True
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+tester.write_close_event(error)
+
+tester.run_streaming_query_on_writer(ForeachWriter(), 2)
+
+open_events = tester.open_events()
+self.assertEqual(len(open_events), 2)
+self.assertSetEqual(set([e['epoch'] for e in open_events]), {0, 1})
+
+self.assertEqual(len(tester.process_events()), 2)
+
+close_events = tester.close_events()
+self.assertEqual(len(close_events), 2)
+self.assertSetEqual(set([e['error'] for e in close_events]), 
{'None'})
+
+def test_streaming_foreach_with_open_returning_false(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return False
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193284293
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
--- End diff --

nit: deserialized` `copy (space)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193289567
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -20,10 +20,48 @@ package org.apache.spark.sql
 import org.apache.spark.annotation.InterfaceStability
 
 /**
- * A class to consume data generated by a `StreamingQuery`. Typically this 
is used to send the
- * generated data to external systems. Each partition will use a new 
deserialized instance, so you
- * usually should do all the initialization (e.g. opening a connection or 
initiating a transaction)
- * in the `open` method.
+ * The abstract class for writing custom logic to process data generated 
by a query.
+ * This is often used to write the output of a streaming query to 
arbitrary storage systems.
--- End diff --

Looks like doc is duplicated between `foreach()` and `ForeachWriter`. I'm 
not sure how we can leave some reference on Python doc instead of duplicating 
content, but even Python doc doesn't support some kind of reference, some part 
of content seems to be OK to be placed to either place, not both.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193291809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python._
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{NextIterator, Utils}
+
+class PythonForeachWriter(func: PythonFunction, schema: StructType)
+  extends ForeachWriter[UnsafeRow] {
+
+  private lazy val context = TaskContext.get()
+  private lazy val buffer = new PythonForeachWriter.UnsafeRowBuffer(
+context.taskMemoryManager, new 
File(Utils.getLocalDir(SparkEnv.get.conf)), schema.fields.length)
+  private lazy val inputRowIterator = buffer.iterator
+
+  private lazy val inputByteIterator = {
+EvaluatePython.registerPicklers()
+val objIterator = inputRowIterator.map { row => 
EvaluatePython.toJava(row, schema) }
+new SerDeUtil.AutoBatchedPickler(objIterator)
+  }
+
+  private lazy val pythonRunner = {
+val conf = SparkEnv.get.conf
+val bufferSize = conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
+PythonRunner(func, bufferSize, reuseWorker)
+  }
+
+  private lazy val outputIterator =
+pythonRunner.compute(inputByteIterator, context.partitionId(), context)
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+outputIterator  // initialize everything
+TaskContext.get.addTaskCompletionListener { _ => buffer.close() }
+true
+  }
+
+  override def process(value: UnsafeRow): Unit = {
+buffer.add(value)
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+buffer.allRowsAdded()
+if (outputIterator.hasNext) outputIterator.next() // to throw python 
exception if there was one
+  }
+}
+
+object PythonForeachWriter {
+
+  /**
+   * A buffer that is designed for the sole purpose of buffering 
UnsafeRows in PythonForeahWriter.
--- End diff --

nit: PythonForeachWriter


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21313: [SPARK-24187][R][SQL]Add array_join function to SparkR

2018-06-05 Thread huaxingao
Github user huaxingao commented on the issue:

https://github.com/apache/spark/pull/21313
  
Thank you very much for your help! @HyukjinKwon @felixcheung 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21482
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91494/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21482
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21482
  
**[Test build #91494 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91494/testReport)**
 for PR 21482 at commit 
[`663fa47`](https://github.com/apache/spark/commit/663fa4726767e5d56d653798fd188123d0f50d8d).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91493/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21498
  
**[Test build #91493 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91493/testReport)**
 for PR 21498 at commit 
[`b058f89`](https://github.com/apache/spark/commit/b058f892af8204cb25a8daa1f1cd1a6de21c5fd6).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-05 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r193280073
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -308,6 +308,170 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
   override def prettyName: String = "map_entries"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
--- End diff --

@ueshin 

>We don't need to care about key duplication like CreateMap for now.

Just verifying: This means I should simply concatenate the maps, possibly 
creating additional duplicates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20915
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91492/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20915
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20915
  
**[Test build #91492 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91492/testReport)**
 for PR 20915 at commit 
[`de9ecb6`](https://github.com/apache/spark/commit/de9ecb64068d1cafa79ce61a2bf1795981a46f46).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21497
  
**[Test build #91498 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91498/testReport)**
 for PR 21497 at commit 
[`a9a5d2a`](https://github.com/apache/spark/commit/a9a5d2a9311ea762f5e27179d36928ca691105d7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193277616
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -35,10 +34,11 @@ import 
org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
 
+
--- End diff --

Thanks for letting me know. Addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3821/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/21498
  
Tests are added. cc @kiszk @mgaido91 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21498
  
**[Test build #91497 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91497/testReport)**
 for PR 21498 at commit 
[`69a7066`](https://github.com/apache/spark/commit/69a70662c5d668ecaed3c8c5f0ecc53f33ab0682).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/19602
  
Thanks for merging !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3820/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3819/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21313: [SPARK-24187][R][SQL]Add array_join function to S...

2018-06-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21313


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21498
  
**[Test build #91496 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91496/testReport)**
 for PR 21498 at commit 
[`6f487c9`](https://github.com/apache/spark/commit/6f487c9a9288176b7fcc574021abff6ae403895b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21313: [SPARK-24187][R][SQL]Add array_join function to SparkR

2018-06-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21313
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21498
  
**[Test build #91495 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91495/testReport)**
 for PR 21498 at commit 
[`0dedf44`](https://github.com/apache/spark/commit/0dedf44559fb6da11c5d903c51bb73f5f508fe6f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193266919
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -35,10 +34,11 @@ import 
org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
 
+
--- End diff --

tiny nit: I would remove this newline ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Un...

2018-06-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21498#discussion_r193263354
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1099,6 +1099,17 @@ object SQLConf {
   .intConf
   
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
+  val UNION_IN_SAME_PARTITION =
+buildConf("spark.sql.unionInSamePartition")
+  .internal()
+  .doc("When true, Union operator will union children results in the 
same corresponding " +
+"partitions if they have same partitioning. This eliminates 
unnecessary shuffle in later " +
+"operators like aggregation. Note that because non-deterministic 
functions such as " +
+"monotonically_increasing_id are depended on partition id. By 
doing this, the values of " +
--- End diff --

Seems we have wanted to make sure non-deterministic functions have same 
values after union. Once we union children in same partitions, the values of 
such functions can be changed. So I added this config to control it. Default 
config is false.






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21482
  
**[Test build #91494 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91494/testReport)**
 for PR 21482 at commit 
[`663fa47`](https://github.com/apache/spark/commit/663fa4726767e5d56d653798fd188123d0f50d8d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20915#discussion_r193260674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -371,14 +387,27 @@ case class FileSourceScanExec(
   val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
   PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, 
hosts)
 }
-  }.groupBy { f =>
-BucketingUtils
-  .getBucketId(new Path(f.filePath).getName)
-  .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
   }
 
+val prunedBucketed = if (optionalBucketSet.isDefined) {
+  val bucketSet = optionalBucketSet.get
+  bucketed.filter {
+f => bucketSet.get(
+  BucketingUtils.getBucketId(new Path(f.filePath).getName)
+.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")))
+  }
+} else {
+  bucketed
+}
+
+val filesGroupedToBuckets = prunedBucketed.groupBy { f =>
+  BucketingUtils
--- End diff --

can we avoid calculating bucket id from file name twice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193260315
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -30,6 +30,7 @@
 from pyspark.sql.readwriter import OptionUtils, to_str
 from pyspark.sql.types import *
 from pyspark.sql.utils import StreamingQueryException
+from abc import ABCMeta, abstractmethod
--- End diff --

done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21495
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3818/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21495
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91491/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21498
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21495
  
**[Test build #91491 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91491/testReport)**
 for PR 21495 at commit 
[`de790fd`](https://github.com/apache/spark/commit/de790fd251ba3727bba23ceb1ca07559d25b7e87).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, 
initializeSpark: () => Unit)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21498
  
**[Test build #91493 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91493/testReport)**
 for PR 21498 at commit 
[`b058f89`](https://github.com/apache/spark/commit/b058f892af8204cb25a8daa1f1cd1a6de21c5fd6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20915
  
**[Test build #91492 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91492/testReport)**
 for PR 20915 at commit 
[`de9ecb6`](https://github.com/apache/spark/commit/de9ecb64068d1cafa79ce61a2bf1795981a46f46).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
I agree that current cache approach may consume excessive memory 
unnecessarily, and that's also same to my finding in #21469. 

The issue is not that simple however, because in micro-batch mode, each 
batch should read previous version of state, otherwise it should read from file 
system, in worst case seeking and reading multiple files in remote file system. 
So previous version of state is encouraged to be available in memory.

There're three cases here (please add if I'm missing here): 1. fail before 
commit 2. committed but batch failed afterwards 3. committed and batch 
succeeds. It might be better to think about all the cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21497
  
Irrespective of "nc" behavior, agree that the read thread can be started 
once in the driver.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
@arunmahadevan 
Yes, before the patch Spark connects to socket server twice: one for 
getting schema, and another one for reading data.

And `-k` flag is only supported for specific distribution, and that's why I 
had to set breakpoint and started nc again after temp reader is stopped.

For example, in my local dev. (macOS 10.12.6), netcat doesn't support -k 
flag.

```
netcat (The GNU Netcat) 0.7.1
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21497
  
@HeartSaVioR , is this some recent change in the behavior?

For me `nc -lk ` works with TextSocketMicroBatchReader. Are you using 
`-k` (nc to stay listening for another connection after its current connection 
is completed) ?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21499: [SPARK-24468][SQL] Handle negative scale when adj...

2018-06-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21499#discussion_r193238183
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala ---
@@ -161,13 +161,17 @@ object DecimalType extends AbstractDataType {
* This method is used only when 
`spark.sql.decimalOperations.allowPrecisionLoss` is set to true.
*/
   private[sql] def adjustPrecisionScale(precision: Int, scale: Int): 
DecimalType = {
-// Assumptions:
+// Assumption:
 assert(precision >= scale)
-assert(scale >= 0)
 
 if (precision <= MAX_PRECISION) {
   // Adjustment only needed when we exceed max precision
   DecimalType(precision, scale)
+} else if (scale < 0) {
+  // Decimal can have negative scale (SPARK-24468). In this case, we 
cannot allow a precision
+  // loss since we would cause a loss of digits in the integer part.
--- End diff --

is there a SQL standard for it? I feel it seems reasonable to truncate the 
integral part, like `123456` -> `123000`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r193236096
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2394,6 +2394,23 @@ def array_repeat(col, count):
 return Column(sc._jvm.functions.array_repeat(_to_java_column(col), 
count))
 
 
+@since(2.4)
+def zip(*cols):
+"""
+Collection function: Merge two columns into one, such that the M-th 
element of the N-th
+argument will be the N-th field of the M-th output element.
+
+:param cols: columns in input
+
+>>> from pyspark.sql.functions import zip as spark_zip
--- End diff --

Hmm, I was thinking about the naming issue actually. Should we use 
`arrays_zip`, `zip_arrays` or something instead of `zip`? Or any suggestions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
cc. @tdas @jose-torres @jerryshao @arunmahadevan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-05 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/21482
  
How is this done in other databases? I don't think we want to invent new 
ways on these basic primitives.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r193230476
  
--- Diff: R/pkg/NAMESPACE ---
@@ -281,6 +281,8 @@ exportMethods("%<=>%",
   "initcap",
   "input_file_name",
   "instr",
+  "isInf",
+  "isinf",
--- End diff --

the functions are case insensitive so i don't think we need both?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21499
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21499
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91490/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21499
  
**[Test build #91490 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91490/testReport)**
 for PR 21499 at commit 
[`2f72ebb`](https://github.com/apache/spark/commit/2f72ebbf6902ffb9fb6133e6e91c40abdc24079c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21495
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21495
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3817/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21495
  
**[Test build #91491 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91491/testReport)**
 for PR 21495 at commit 
[`de790fd`](https://github.com/apache/spark/commit/de790fd251ba3727bba23ceb1ca07559d25b7e87).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-05 Thread dbtsai
Github user dbtsai commented on the issue:

https://github.com/apache/spark/pull/21495
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21499: [SPARK-24468][SQL] Handle negative scale when adj...

2018-06-05 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21499#discussion_r193185478
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala ---
@@ -161,13 +161,17 @@ object DecimalType extends AbstractDataType {
* This method is used only when 
`spark.sql.decimalOperations.allowPrecisionLoss` is set to true.
*/
   private[sql] def adjustPrecisionScale(precision: Int, scale: Int): 
DecimalType = {
-// Assumptions:
+// Assumption:
 assert(precision >= scale)
-assert(scale >= 0)
 
 if (precision <= MAX_PRECISION) {
   // Adjustment only needed when we exceed max precision
   DecimalType(precision, scale)
+} else if (scale < 0) {
+  // Decimal can have negative scale (SPARK-24468). In this case, we 
cannot allow a precision
+  // loss since we would cause a loss of digits in the integer part.
--- End diff --

yes, in this case we return `null` at the moment. I have a long standing PR 
to make this behavior configurable in order to be compliant to SQL standard 
(https://github.com/apache/spark/pull/20350), but it has not been very active 
lately


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21500
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-05 Thread aalobaidi
Github user aalobaidi commented on the issue:

https://github.com/apache/spark/pull/21500
  
@tdas this is the change I mentioned in our chat in SparkSummit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21499: [SPARK-24468][SQL] Handle negative scale when adj...

2018-06-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21499#discussion_r193181354
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala ---
@@ -161,13 +161,17 @@ object DecimalType extends AbstractDataType {
* This method is used only when 
`spark.sql.decimalOperations.allowPrecisionLoss` is set to true.
*/
   private[sql] def adjustPrecisionScale(precision: Int, scale: Int): 
DecimalType = {
-// Assumptions:
+// Assumption:
 assert(precision >= scale)
-assert(scale >= 0)
 
 if (precision <= MAX_PRECISION) {
   // Adjustment only needed when we exceed max precision
   DecimalType(precision, scale)
+} else if (scale < 0) {
+  // Decimal can have negative scale (SPARK-24468). In this case, we 
cannot allow a precision
+  // loss since we would cause a loss of digits in the integer part.
--- End diff --

> cannot allow ...

do you mean we should return null for this case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21500
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-05 Thread aalobaidi
GitHub user aalobaidi opened a pull request:

https://github.com/apache/spark/pull/21500

Scalable Memory option for HDFSBackedStateStore

More scalable memory management for HDFSBackedStateStore. This is 
controlled by a configuration 
(`spark.sql.streaming.stateStore.unloadAfterCommit`), if enabled 
HDFSBackedStateStore will unload state after commit. 

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

This is been tested manually but need unit tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aalobaidi/spark Memory-HDFSBackedStateStore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21500


commit fecbc23ec47c30d69d58bcfa3573751b5432eba9
Author: Ahmed Al-Obaidi 
Date:   2018-03-31T19:43:45Z

Scalable Memory option for HDFSBackedStateStore

Allow configuration option to unload loadedMaps from memory after commit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...

2018-06-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19602


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21449: [SPARK-24385][SQL] Resolve self-join condition ambiguity...

2018-06-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21449
  
> In the short term we should make the behavior of EqualTo and 
EqualNullSafe identical.

This seems pretty safe and reasonable to me


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19602
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-05 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193171992
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

Yeah, I agree. And the hard part may be how to convert a `partitionSpec` to 
an `EqualsTo`. 
I think it's better to let the `AstBuilder` to handle this. If so, we may 
have to have two `AlterTableDropPartitionCommand` instances in `ddl.scala`, one 
for all `partitionSpec` and one for all `expression`. But it maybe a bit weird.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19602
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91489/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19602
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19602
  
**[Test build #91489 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91489/testReport)**
 for PR 19602 at commit 
[`8d709fd`](https://github.com/apache/spark/commit/8d709fdf63d68bf5bd133212f081619bd40ba733).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6

2018-06-05 Thread som-snytt
Github user som-snytt commented on the issue:

https://github.com/apache/spark/pull/21495
  
The Scala REPL change at startup was to read user input while the 
single-threaded compiler was initializing on the main thread.

There is a `SplashReader` that collects that input; its last act is to 
replace itself with the real interactive reader. (I see that `-e` is 
implemented as a reader of the expression string.)

Since you can forgo the illusion of a "snappy start-up", I think your 
`printWelcome` could just block on a latch, waiting for your custom init to 
complete. (Your init code could either just print stuff, or it could stash a 
string for your printWelcome to append.)

Never mind, that doesn't work because printWelcome is on the main thread, 
not the splash thread; since printWelcome is expensive I/O, I ought to have had 
the foresight to move it to the splash thread.

So, your best bet for synchronous startup is to do everything in 
printWelcome: createInterpreter, your init commands that produce output. 
`IMain` has a compiler that is initialized lazily, so I don't think you have to 
explicitly `intp.initializeSynchronous`.

But `createInterpreter` will be called again; you'll want to detect that 
and make it a no-op; the method is also called for `:replay`, I don't know if 
that is useful in the spark environment?

Unfortunately, there is no option for specifying a custom splash reader 
that might do these things.

As a final irony, I did some janitorial work to deprecate `-Yrepl-sync`, 
which was unused (I think it was put in because the old threading was broken); 
maybe I will revive it for this use case, to skip the splash reader which 
prints the prompt you don't want.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21469
  
**[Test build #91486 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91486/testReport)**
 for PR 21469 at commit 
[`345397d`](https://github.com/apache/spark/commit/345397dd530898697ef338ec55b81ad11ece4dc0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class StateStoreCustomAverageMetric(name: String, desc: String) 
extends StateStoreCustomMetric`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21469
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91486/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21469
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21449: [SPARK-24385][SQL] Resolve self-join condition ambiguity...

2018-06-05 Thread daniel-shields
Github user daniel-shields commented on the issue:

https://github.com/apache/spark/pull/21449
  
In the short term we should make the behavior of EqualTo and EqualNullSafe 
identical. We could do that by adding a case for EqualNullSafe that mirrors 
that of EqualTo.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-06-05 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
I tried the following change but didn't seem to get more output from Kafka:
```
diff --git a/external/kafka-0-10-sql/src/test/resources/log4j.properties 
b/external/kafka-0-10-sql/src/test/resources/log4j.properties
index 75e3b53..0d65339 100644
--- a/external/kafka-0-10-sql/src/test/resources/log4j.properties
+++ b/external/kafka-0-10-sql/src/test/resources/log4j.properties
@@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd 
HH:mm:ss.SSS} %t %p %c{

 # Ignore messages below warning level from Jetty, because it's a bit 
verbose
 log4j.logger.org.spark-project.jetty=WARN
-
+log4j.logger.org.apache.kafka=DEBUG
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21497
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91483/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21497
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21497
  
**[Test build #91483 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91483/testReport)**
 for PR 21497 at commit 
[`7dc75f9`](https://github.com/apache/spark/commit/7dc75f96895cb7b35b2e8387fa8450e8127b430f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21045
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91488/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21045
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21045
  
**[Test build #91488 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91488/testReport)**
 for PR 21045 at commit 
[`2b88387`](https://github.com/apache/spark/commit/2b883879b8efd0d514553612cb2918617bb5044b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21497
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91484/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21497
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21497
  
**[Test build #91484 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91484/testReport)**
 for PR 21497 at commit 
[`7dc75f9`](https://github.com/apache/spark/commit/7dc75f96895cb7b35b2e8387fa8450e8127b430f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >