[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...

2018-05-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21122#discussion_r186119360
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -1354,7 +1354,8 @@ class HiveDDLSuite
 val indexName = tabName + "_index"
 withTable(tabName) {
   // Spark SQL does not support creating index. Thus, we have to use 
Hive client.
-  val client = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+  val client =
+
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
--- End diff --

Why are we passing the client using the catalog in the first place? Is this 
convenience, or is there a reason why we can't pass both separately? I don't 
think we should be doing this without a reason. I don't like needless casts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-05-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
@maropu, I suspect that the problem is that comparison is different for 
strings: `"17297598712"` is less than `"5"` with string comparison.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-05-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
@gatorsmile, are you happy committing this with the benchmark results?

@maropu, thanks for taking the time to add these benchmarks, it is really 
great to have them so we can monitor the performance over time!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...

2018-05-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21118
  
@cloud-fan, actually I tried a lot of different queries yesterday, 
including joins and aggregations. The only thing that didn't work was `collect` 
for a `select * from t` because `SparkPlan` assumes that the [rows will be 
unsafe](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L254).

I'm planning to do more testing, but I don't see anything that requires 
`UnsafeRow` in generated code. @marmbrus, what was the original intent in 
codegen? Should codegen use `InternalRow` or `UnsafeRow`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-05-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
@maropu, looking at the pushdown benchmark, it looks like ORC and Parquet 
either both benefit or both do not benefit from pushdown. In some cases ORC is 
much faster, which is due to the fact that ORC will skip reading pages, not 
just row groups. But, when ORC benefits from pushdown so does Parquet, for 
example the `Select 1 int row (value = 7864320)` case.

I think that you were expecting a string comparison case to have a 
significant benefit over non-pushdown. But I would only expect that if ORC had 
a similar benefit. That's because this is dependent on the clustering of values 
in the file so that Parquet can eliminate row groups. If ORC didn't have a 
benefit, then I would expect that the data just isn't clustered in a way that 
helps.

I'm not sure how you're generating data, but I'd recommend adding a sorted 
column case with enough data to create multiple row groups (or stripes for 
ORC). That would write data so that you can ignore some row groups and you 
should see a speed up.

Parquet also supports dictionary-based row group filtering. To test that, 
make sure you have a column that is entirely dictionary-encoded: pick a small 
set of values and randomly draw from that set. Then if you search for a value 
that isn't in that set you should see a speedup. Also make sure that you have 
`parquet.filter.dictionary.enabled=true` set in the Hadoop configuration so 
that Parquet uses dictionary filtering.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-05-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
@maropu, are you sure about the INT and FLOAT columns? I think you might 
have that assessment backwards. Here's the INT results from the PR gist:

```
SQL Single INT Column Scan:  Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative


SQL Parquet Vectorized 149 /  162105.5  
 9.5   1.0X
SQL Parquet MR1825 / 1836  8.6  
   116.1   0.1X
```

And here are the INT results from the master gist:

```
SQL Single INT Column Scan:  Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative


SQL Parquet Vectorized 250 /  292 63.0  
15.9   1.0X
SQL Parquet MR3175 / 3202  5.0  
   201.8   0.1X
```

I think that shows that the PR result was significantly faster, not slower. 
(The other INT test was about the same.)

Here's the FLOAT column from the PR gist:

```
SQL Single FLOAT Column Scan:Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative


SQL Parquet Vectorized 145 /  158108.8  
 9.2   1.0X
SQL Parquet MR1840 / 1843  8.5  
   117.0   0.1X
```

And FLOAT from the master gist:

```
SQL Single FLOAT Column Scan:Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative


SQL Parquet Vectorized 261 /  316 60.2  
16.6   1.0X
SQL Parquet MR3267 / 3284  4.8  
   207.7   0.1X
```

Am I reading this incorrectly? I'm considering lower time values and higher 
rate values to be better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...

2018-05-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21118
  
@cloud-fan and @jose-torres: I looked at `explain codegen` for reading from 
a Parquet table (with vectorized reads disabled) and it doesn't look like there 
is a dependency on `UnsafeRow`:

```
explain codegen select * from test
```
```
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*FileScan parquet rblue.test[id#40L,data#41] Batched: false, Format: 
Parquet, Location: InMemoryFileIndex[s3://bucket/warehouse/blue.db/test/..., 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<id:bigint,data:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_numOutputRows;
/* 009 */   private scala.collection.Iterator scan_input;
/* 010 */
/* 011 */   public GeneratedIterator(Object[] references) {
/* 012 */ this.references = references;
/* 013 */   }
/* 014 */
/* 015 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
/* 016 */ partitionIndex = index;
/* 017 */ this.inputs = inputs;
/* 018 */ this.scan_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 019 */ scan_input = inputs[0];
/* 020 */
/* 021 */   }
/* 022 */
/* 023 */   protected void processNext() throws java.io.IOException {
/* 024 */ while (scan_input.hasNext()) {
/* 025 */   InternalRow scan_row = (InternalRow) scan_input.next();
/* 026 */   scan_numOutputRows.add(1);
/* 027 */   append(scan_row);
/* 028 */   if (shouldStop()) return;
/* 029 */ }
/* 030 */   }
/* 031 */ }
```

I've looked at a few simple queries with filters, projects, and aggregation 
and it doesn't look like any of the generated code depends on `UnsafeRow`. Can 
anyone confirm that it is not a requirement to pass `UnsafeRow` into generated 
code?

If there is no requirement for the rows to be `UnsafeRow`, then is it 
necessary to add an `UnsafeProjection` or would the copy to unsafe make 
execution slower?

If the rows passed from the data source are `UnsafeRow`, then 
`UnsafeProjection` detects it and copies the row buffer (see 
[examples](https://github.com/Netflix/iceberg/blob/parquet-value-readers/spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java#L256-L262)).
 That's faster than copying individual values, but slower than just using the 
row as-is. Not adding a projection would make this case faster.

If the rows passed from the data source are `InternalRow` and not 
`UnsafeRow`, then we *could* copy them immediately, but it is very likely that 
the data is already going to be copied. A projection, for example, immediately 
copies all of the data out of the `UnsafeRow` and an initial copy to unsafe is 
just extra work. Similarly, a filter is probably selective enough that it makes 
sense to wait until after the filter runs to copy the entire row of data to 
unsafe.

In all of the cases that I've looked at, a copy to unsafe would only slow 
down execution. Unsafe rows may have better cache locality, but the copy reads 
*all* of the data anyway.

If I'm right and we do not need to insert a copy to `UnsafeRow`, then we 
don't need the `SupportsScanUnsafeRow` trait. Data sources can still produce 
`UnsafeRow` and it would work without a problem. The only time we need to know 
that an `InternalRow` is actually an `UnsafeRow` is if we are adding a 
projection to unsafe, in which case we could avoid a copy of the row buffer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-05-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21143
  
@cloud-fan, that's kind of the point I was trying to make. It is too 
difficult to do whole-stage codegen, but we could add a codegen filter before 
whole-stage codegen. Why make the implementation handle it using Spark 
internals when we could do it more cleanly in Spark?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to R...

2018-05-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21145#discussion_r185310952
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -299,13 +299,13 @@ private[kafka010] class KafkaMicroBatchReader(
   }
 }
 
-/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch 
streaming query. */
+/** A [[ReadTask]] for reading Kafka data in a micro-batch streaming 
query. */
 private[kafka010] case class KafkaMicroBatchDataReaderFactory(
--- End diff --

Sure, good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-05-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21143
  
@cloud-fan, union doesn't really help. I already have support for 
mixed-formats working just fine. The format isn't the problem, it is filtering 
(and a similar problem with projection). Parquet allows you to push down 
filters, while Avro doesn't.

Right now, I'm running filters inside my data source to ensure that the 
result always matches pushed filters, which is okay but doesn't use codegen. 
Since we already have a need for per-split filters for residuals, we could do 
something similar in Spark instead of in the data sources and allow each split 
to return a residual. Then Spark would add a codegen'ed filter before 
proceeding with the rest of the physical plan. You might think of it as a 
`ResidualFilter` node, where the filter expression changes, instead of a 
separate physical plan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to ReadTask...

2018-05-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21145
  
@cloud-fan and @henryr, do you have an opinion about naming here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-05-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
@mswit-databricks, I wouldn't worry about that. We've limited the length of 
binary and string fields.

In the next version of Parquet, we're planning on releasing page indexes, 
which are lower and upper bounds instead of min and max values. That gives us 
more flexibility to shorten values and avoid the case that you're worried about.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to be an in...

2018-04-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21122
  
Thanks for pointing this out, @henryr. This looks like a good change to 
support multiple catalogs.

I think it looks fine, other than exposing `unwrapped` to get the Hive 
client. I think we're probably abusing that instead of passing the client so I 
think it makes sense to pass the client correctly instead of using `unwrapped`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...

2018-04-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21122#discussion_r185138677
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 ---
@@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus
  *
  * Implementations should throw [[NoSuchDatabaseException]] when databases 
don't exist.
  */
-abstract class ExternalCatalog
-  extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
+trait ExternalCatalog {
   import CatalogTypes.TablePartitionSpec
 
+  // Returns the underlying catalog class (e.g., HiveExternalCatalog).
+  def unwrapped: ExternalCatalog = this
--- End diff --

Is there a better way to pass the Hive client? It looks like the uses of 
`unwrapped` actually just get the Hive client from the HiveExternalCatalog. If 
we can pass that through, it would prevent the need for this. I think that 
would be cleaner, unless there is a problem with that I'm missing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r185055954
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -213,6 +215,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
   }
 }
 
+/**
+ * Replace the V2 data source of table in [[InsertIntoTable]] to V1 
[[FileFormat]].
+ * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into 
 view `t` fails
+ * since there is no correspoding physical plan.
+ * This is a temporary hack for making current data source V2 work.
--- End diff --

I don't like the idea of "temporary hack" rules. What is the long-term plan 
for getting rid of this? Or is this something we will end up supporting forever?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
Is it necessary to block this commit on benchmarks? We know that it doesn't 
degrade performance from the Parquet benchmarks and TPC-DS run. What do you 
think needs to be done before this can move forward?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
Yes, it is safe to use push-down for string columns in data written with 
this version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to ReadTask...

2018-04-27 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21145
  
I think `ReadTask` is fine. That name does not imply that you can use the 
object itself to read, but it does correctly show that it is one task in a 
larger operation. I think the name implies that it represents something to be 
read, which is correct, and it is reasonable to look at the API for that object 
to see how to read it. That can be clearly accomplished, so I don't think we 
need a different name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to ReadTask...

2018-04-26 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21145
  
@arunmahadevan, the problem is that the current naming is misleading. This 
is not a factory (it only produces one specific reader) and it does not have 
the same lifecycle as the write-side factory. Using parallel naming for the two 
implies an equivalence that doesn't exist.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-25 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21143
  
Thanks for working on this, @cloud-fan! I was thinking about needing it 
just recently so that data sources can delegate to Spark when needed.

I'll have a thorough look at it tomorrow, but one quick high-level 
question: should we make these residuals based on the input split instead?

Input splits might have different residual filters that need to be applied. 
For example, if you have a time range query, `ts > X`, and are storing data by 
day, then you know that when `day(ts) > day(X)` that `ts > X` *must* be true, 
but when `day(ts) = day(X)`, `ts > X` *might* be true. So for only some splits, 
when scanning the boundary day, you need to run the original filter, but not 
for any other splits.

Another use case for a per-split residual is when splits might be different 
file formats. Parquet allows pushing down filters, but Avro doesn't. In a mixed 
table format it would be great for Avro splits to return the entire expression 
as a residual, while Parquet splits do the filtering.

What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-25 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
Thank you @maropu!

What resources does the run require? Is it something we could create a 
Jenkins job to run?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to R...

2018-04-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21145#discussion_r184235329
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -299,13 +299,13 @@ private[kafka010] class KafkaMicroBatchReader(
   }
 }
 
-/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch 
streaming query. */
+/** A [[ReadTask]] for reading Kafka data in a micro-batch streaming 
query. */
 private[kafka010] case class KafkaMicroBatchDataReaderFactory(
--- End diff --

This fixes the API, not implementations, and it already touches 30+ files.

I'd rather not fix the downstream classes for two reasons. First, to avoid 
this becoming really large. Second, we need to be able to evolve these APIs 
without requiring changes to all implementations. This is still evolving and if 
we need to update 20+ implementations to make simple changes, then I think 
that's a problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...

2018-04-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21118#discussion_r184216025
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -86,7 +87,7 @@ class KafkaContinuousReader(
 KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
 
-  override def createUnsafeRowReaderFactories(): 
ju.List[DataReaderFactory[UnsafeRow]] = {
+  override def createReadTasks(): ju.List[ReadTask[InternalRow]] = {
--- End diff --

I've moved this to #21145. I'll rebase this PR on that one, so lets try to 
get that in first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-25 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
The fix for PARQUET-686 was to suppress min/max stats. It is safe to push 
filters, but those filters can't be used without the stats. 1.10.0 has the 
correct stats and can use those filters.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r184156731
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 case plan: InMemoryRelation => plan
   }.head
   // InMemoryRelation's stats is file size before the underlying 
RDD is materialized
-  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+  assert(inMemoryRelation.computeStats().sizeInBytes === 800)
--- End diff --

This is data dependent so it is hard to estimate. We write the stats for 
older readers when the type uses a signed sort order, so it is limited to 
mostly primitive types and won't be written for byte arrays or utf8 data. That 
limits the size to 16 bytes + thrift overhead per page and you might have about 
100 pages per row group. So 1.5k per 128MB, which is about 0.001%.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-25 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
There are two main reasons to update. First, the problem behind SPARK-17213 
is fixed, hence the new min and max fields. Second, this updates the internal 
byte array management, which is needed for page skipping in the next few 
versions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r184139736
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 case plan: InMemoryRelation => plan
   }.head
   // InMemoryRelation's stats is file size before the underlying 
RDD is materialized
-  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+  assert(inMemoryRelation.computeStats().sizeInBytes === 800)
--- End diff --

Parquet fixed a problem with value ordering in statistics, which required 
adding new metadata min and max fields. For older readers, Parquet also writes 
the old values when it makes sense to. This is a slight increase in overhead, 
which is more noticeable when files contain just a few records.

Don't be alarmed at the percentage difference here, it is just a small 
file. Parquet isn't increasing file sizes by 8%, that would be silly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21145: SPARK-24073: Rename DataReaderFactory to ReadTask...

2018-04-24 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21145

SPARK-24073: Rename DataReaderFactory to ReadTask.


## What changes were proposed in this pull request?

This reverses the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.

ReadTask/DataReader function as Iterable/Iterator. One ReadTask is a
specific read task for a partition of the data to be read, in contrast
to DataWriterFactory where the same factory instance is used in all
write tasks. ReadTask's purpose is to manage the lifecycle of DataReader
with an explicit create operation to mirror the close operation. This is
no longer clear from the API, where DataReaderFactory appears to be more
generic than it is and it isn't clear why a set of them is produced for
a read.

## How was this patch tested?

Existing tests, which have been updated to use the new name.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-24073-revert-data-reader-factory-rename

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21145.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21145


commit c364c05d3141bbe0ed29a2b02cecfa541d9c8212
Author: Ryan Blue <blue@...>
Date:   2018-04-24T19:55:25Z

SPARK-24073: Rename DataReaderFactory to ReadTask.

This reverses the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.

ReadTask/DataReader function as Iterable/Iterator. One ReadTask is a
specific read task for a partition of the data to be read, in contrast
to DataWriterFactory where the same factory instance is used in all
write tasks. ReadTask's purpose is to manage the lifecycle of DataReader
with an explicit create operation to mirror the close operation. This is
no longer clear from the API, where DataReaderFactory appears to be more
generic than it is and it isn't clear why a set of them is produced for
a read.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-24 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
> Based on the previous upgrade (e.g., #13280 (comment)), we hit the 
performance regressions when we upgrade Parquet and we did the revert at the 
end.

I should point out that the regression wasn't reproducible, so we aren't 
sure what the cause was. We also didn't have performance numbers on the Parquet 
side or a case of anyone running it in production (we have been for a couple 
months). But, I can understand wanting to be thorough.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-24 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
Okay, I don't have the time to set up and run benchmarks without a stronger 
case for this causing a regression (given the Parquet testing), but other 
people should feel free to pick this up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-24 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
@cloud-fan, is there a Jenkins job to run it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...

2018-04-20 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21118
  
Yeah, we should probably add a projection. It's probably only working 
because the InternalRows that are produced are all UnsafeRow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...

2018-04-20 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21118
  
@jose-torres, @cloud-fan, can you take a look at this? It updates the v2 
API to use InternalRow by default.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...

2018-04-20 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21118

SPARK-23325: Use InternalRow when reading with DataSourceV2.

## What changes were proposed in this pull request?

This updates the DataSourceV2 API to use InternalRow instead of Row for the 
default case with no scan mix-ins. Because the API is changing significantly in 
the same places, this also renames ReaderFactory back to ReadTask.

Support for readers that produce Row is added through 
SupportsDeprecatedScanRow, which matches the previous API. Readers that used 
Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been 
migrated to use no SupportsScan mix-ins and produce InternalRow.

## How was this patch tested?

This uses existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-23325-datasource-v2-internal-row

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21118.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21118


commit eddd049ed78970dda0796a37462e52f53bfeacc4
Author: Ryan Blue <blue@...>
Date:   2018-04-20T20:15:58Z

SPARK-23325: Use InternalRow when reading with DataSourceV2.

This updates the DataSourceV2 API to use InternalRow instead of Row for
the default case with no scan mix-ins. Because the API is changing
significantly in the same places, this also renames ReaderFactory back
to ReadTask.

Support for readers that produce Row is added through
SupportsDeprecatedScanRow, which matches the previous API. Readers that
used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been
migrated to use no SupportsScan mix-ins and produce InternalRow.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-20 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...

2018-04-20 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/2
  
Thanks for doing this as a follow-up. I had one minor comment, but 
otherwise I'm +1. I see what you mean about using `PhysicalOperation` now. It 
is slightly cleaner and I guess `PhysicalOperation` is the right way to 
accumulate the filters and projection from a sub-tree.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21111: [SPARK-23877][SQL][followup] use PhysicalOperatio...

2018-04-20 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/2#discussion_r183101013
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -114,11 +119,8 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 relation match {
   case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, 
isStreaming) =>
 val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-val partitionData = fsRelation.location.listFiles(relFilters, 
Nil)
-// partition data may be a stream, which can cause 
serialization to hit stack level too
-// deep exceptions because it is a recursive structure in 
memory. converting to array
-// avoids the problem.
--- End diff --

Yes, that does fix it but that's in a non-obvious way. What isn't clear is 
what guarantees that the rows used to construct the LocalRelation will never 
need to be serialized. Would it be reasonable for a future commit to remove the 
`@transient` modifier and re-introduce the problem?

I would rather this return the data in a non-recursive structure, but it's 
a minor point.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20988#discussion_r182858087
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 
   /**
* A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
-   * pair of the partition attributes and the table relation node.
+   * pair of the partition attributes, partition filters, and the table 
relation node.
*
* It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the 
partitioned table relation
* node.
*/
-  object PartitionedRelation {
-
-def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
-  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
-if fsRelation.partitionSchema.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-Some((AttributeSet(partAttrs), l))
-
-  case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
-Some((AttributeSet(partAttrs), relation))
-
-  case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
-}
+  object PartitionedRelation extends PredicateHelper {
+
+def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
+  plan match {
+case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+  if fsRelation.partitionSchema.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+  Some((AttributeSet(partAttrs), Nil, l))
+
+case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+  Some((AttributeSet(partAttrs), Nil, relation))
+
+case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
+  }
 
-  case f @ Filter(condition, child) if condition.deterministic =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (f.references.subsetOf(partAttrs)) Some((partAttrs, 
relation)) else None
-}
+case f @ Filter(condition, child) if condition.deterministic =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (f.references.subsetOf(partAttrs)) {
+  Some((partAttrs, splitConjunctivePredicates(condition) ++ 
filters, relation))
--- End diff --

Good catch. I've added a test case and updated the `PartitionedRelation` 
code to keep track of both original partition attributes -- that the filter 
needs -- and the top-most node's output that is used by the rule.

For using `PhysicalOperation` instead of `PartitionedRelation`, I don't see 
a compelling reason for such an invasive change. This currently adds a couple 
of results to unapply and keeps mostly the same logic. `PhysicalOperation` 
would lose the check that the references are a subset of the partition 
attributes and be a lot larger change. If you think this should be refactored, 
lets talk about that separately to understand the motivation for the change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [SPARK-23952] remove type parameter in DataReader...

2018-04-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r182596471
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -146,7 +146,7 @@ private[kafka010] class KafkaMicroBatchReader(
   new KafkaMicroBatchDataReaderFactory(
 range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, 
reuseKafkaConsumer)
 }
-factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
+factories.map(_.asInstanceOf[DataReaderFactory]).asJava
--- End diff --

Why is this cast necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [SPARK-23952] remove type parameter in DataReader...

2018-04-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r182596392
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
 ---
@@ -17,52 +17,85 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
 import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, 
TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.sources.v2.DataFormat
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.types.StructType
 
-class DataSourceRDDPartition[T : ClassTag](val index: Int, val 
readerFactory: DataReaderFactory[T])
+class DataSourceRDDPartition(val index: Int, val factory: 
DataReaderFactory)
   extends Partition with Serializable
 
-class DataSourceRDD[T: ClassTag](
+class DataSourceRDD(
 sc: SparkContext,
-@transient private val readerFactories: Seq[DataReaderFactory[T]])
-  extends RDD[T](sc, Nil) {
+@transient private val readerFactories: Seq[DataReaderFactory],
+schema: StructType)
+  extends RDD[InternalRow](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
 readerFactories.zipWithIndex.map {
   case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
 }.toArray
   }
 
-  override def compute(split: Partition, context: TaskContext): 
Iterator[T] = {
-val reader = 
split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.createDataReader()
-context.addTaskCompletionListener(_ => reader.close())
-val iter = new Iterator[T] {
-  private[this] var valuePrepared = false
-
-  override def hasNext: Boolean = {
-if (!valuePrepared) {
-  valuePrepared = reader.next()
-}
-valuePrepared
-  }
-
-  override def next(): T = {
-if (!hasNext) {
-  throw new java.util.NoSuchElementException("End of stream")
-}
-valuePrepared = false
-reader.get()
-  }
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val factory = split.asInstanceOf[DataSourceRDDPartition].factory
+val iter: DataReaderIterator[UnsafeRow] = factory.dataFormat() match {
+  case DataFormat.ROW =>
+val reader = new RowToUnsafeDataReader(
+  factory.createRowDataReader(), 
RowEncoder.apply(schema).resolveAndBind())
+new DataReaderIterator(reader)
+
+  case DataFormat.UNSAFE_ROW =>
+new DataReaderIterator(factory.createUnsafeRowDataReader())
+
+  case DataFormat.COLUMNAR_BATCH =>
+new DataReaderIterator(factory.createColumnarBatchDataReader())
+  // TODO: remove this type erase hack.
+  .asInstanceOf[DataReaderIterator[UnsafeRow]]
--- End diff --

Isn't this change intended to avoid these casts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [SPARK-23952] remove type parameter in DataReader...

2018-04-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r182596274
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
 ---
@@ -17,12 +17,12 @@
 
 package org.apache.spark.sql.sources.v2.reader.streaming;
 
+import java.util.Optional;
--- End diff --

Nit: this is a cosmetic change that should be reverted before committing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [SPARK-23952] remove type parameter in DataReader...

2018-04-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r182596153
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
 ---
@@ -52,10 +56,46 @@
   }
 
   /**
-   * Returns a data reader to do the actual reading work.
+   * The output data format of this factory's data reader. Spark will 
invoke the corresponding
+   * create data reader method w.r.t. the return value of this method:
+   * 
+   *   {@link DataFormat#ROW}: {@link #createRowDataReader()}
+   *   {@link DataFormat#UNSAFE_ROW}: {@link 
#createUnsafeRowDataReader()}
+   *   @{@link DataFormat#COLUMNAR_BATCH}: {@link 
#createColumnarBatchDataReader()}
+   * 
+   */
+  DataFormat dataFormat();
--- End diff --

If the data format is determined when the factory is created, then I don't 
see why it is necessary to change the API. This just makes it more confusing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20988: [SPARK-23877][SQL]: Use filter predicates to prune parti...

2018-04-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20988
  
@cloud-fan, I've added the test. Thanks for letting me know about 
HiveCatalogMetrics, that's exactly what I needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
I backported the Hadoop zstd codec to 2.7.3 without much trouble. But 
either way, I think that's a separate concern. This is about getting Parquet 
updated, not about worrying whether users can easily add compression 
implementations to their classpath.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20988#discussion_r182579387
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 
   /**
* A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
-   * pair of the partition attributes and the table relation node.
+   * pair of the partition attributes, partition filters, and the table 
relation node.
*
* It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the 
partitioned table relation
* node.
*/
-  object PartitionedRelation {
-
-def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
-  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
-if fsRelation.partitionSchema.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-Some((AttributeSet(partAttrs), l))
-
-  case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
-Some((AttributeSet(partAttrs), relation))
-
-  case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
-}
+  object PartitionedRelation extends PredicateHelper {
+
+def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
+  plan match {
+case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+  if fsRelation.partitionSchema.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+  Some((AttributeSet(partAttrs), Nil, l))
+
+case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+  Some((AttributeSet(partAttrs), Nil, relation))
+
+case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
--- End diff --

@cloud-fan, that is basically how this works already. Each matched node 
calls `unapply(child)` to get the result from the child node, then it adds the 
current node's conditions to that result. Using `unapply` instead of 
`getPartitionedRelation` makes this work in the matching rule:

```scala
  case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, 
filters, relation)) =>
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
@scottcarey, Parquet will use the compressors if they are available. You 
can add them from an external Jar and it will work. LZ4 should also work out of 
the box because it is included in Hadoop 2.7.

I agree that it would be nice if Parquet didn't rely on Hadoop for 
compression libraries, but that's how it is at the moment. Feel free to fix it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r182563063
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,159 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
+  }
+
+  private byte getByte() {
+try {
+  return (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  currentByte = getByte();
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181899509
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181883674
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Yeah, exactly. We can detect that the buffer is backed by an array and use 
the other call. If we think this is worth it as a short-term fix, I'll update 
this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark

2018-04-16 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21071
  
Some metrics to convince ourselves that using the null scope has no 
performance impact would be great.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181864492
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

The reason why I moved the loop out was that I didn't think that using the 
byte[] API would actually be better. Parquet doesn't guarantee that these byte 
buffers are on the heap and backed by byte arrays, so we would need to copy the 
bytes out of the buffer into an array only to copy them again in the column 
vector call. Two copies (and possibly allocation) seemed worse than moving the 
loop.

We could catch the case where the buffers are on-heap and make the bulk 
call. The drawback is that this will be temporary and will be removed when 
ColumnVector supports ByteBuffer. And, it only works/matters when Parquet uses 
on-heap buffers and Spark uses off-heap buffers. Is that worth the change to 
this PR? I can take a shot at it if you think it is. I'd rather update 
ColumnVector and then follow up though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark

2018-04-16 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21071
  
@devaraj-kavali, do you have any measurements to quantify how this impacts 
overall performance? We would want to know before releasing this for use 
because using HTrace means having it on all the time to be able to analyze 
slow-downs. Like @steveloughran suggests, it should also be optional (unless 
there is no measurable cost to having it on all the time).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...

2018-04-16 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21060
  
I agree with what @srowen said:


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181826671
  
--- Diff: pom.xml ---
@@ -129,7 +129,7 @@
 
 1.2.1
 10.12.1.1
-1.8.2
+1.10.0
--- End diff --

I excluded the commons-pool dependency from parquet-hadoop to avoid this. I 
also tested the latest Parquet release with commons-pool 1.5.4 and everything 
passes. I don't think it actually requires 1.6.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181535144
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

It looks that way, but it actually replaces a similar loop: 
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java#L283-L291

The main problem is that ByteBufffer isn't supported in the column vectors. 
That seems beyond the scope of this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21070
  
Upstream benchmarks for buffer management changes are here: 
https://github.com/apache/parquet-mr/pull/390#issuecomment-338505426

That doesn't show the GC benefit for smaller buffer allocations because of 
the heap size. It is just to show that the changes do no harm.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21070

SPARK-23972: Update Parquet to 1.10.0.

## What changes were proposed in this pull request?

This updates Parquet to 1.10.0 and updates the vectorized path for buffer 
management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte 
arrays in encoders. This allows Parquet to break allocations into smaller 
chunks that are better for garbage collection.

## How was this patch tested?

Existing Parquet tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-23972-update-parquet-to-1.10.0

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21070.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21070


commit 4df17a6e9726cb22e499d479a9ab48f5db18a538
Author: Ryan Blue <blue@...>
Date:   2017-12-01T01:25:53Z

SPARK-23972: Update Parquet to 1.10.0.

This updates the vectorized path for changes in Parquet 1.10.0, which
uses ByteBufferInputStream instead of byte arrays in encoders. This
allows Parquet to break allocations into smaller chunks that are better
for garbage collection.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r181509305
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -368,8 +368,7 @@ case class FileSourceScanExec(
 val bucketed =
   selectedPartitions.flatMap { p =>
 p.files.map { f =>
-  val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
--- End diff --

Yeah, I think the commit itself would be self-contained reorganization. The 
motivation is to refactor for this PR, which is okay.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20988: [SPARK-23877][SQL]: Use filter predicates to prune parti...

2018-04-13 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20988
  
@cloud-fan or @gatorsmile, could you review this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r179811255
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -368,8 +368,7 @@ case class FileSourceScanExec(
 val bucketed =
   selectedPartitions.flatMap { p =>
 p.files.map { f =>
-  val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
--- End diff --

Better organization to support other changes like this one is the reason.

@jose-torres was right to point out that these changes are self-contained 
enough to go in a separate PR and @gengliangwang and I both agreed. Why make 
this commit larger than necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-05 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/20988

[SPARK-23877][SQL]: Use filter predicates to prune partitions in 
metadata-only queries

## What changes were proposed in this pull request?

This updates the OptimizeMetadataOnlyQuery rule to use filter expressions 
when listing partitions, if there are filter nodes in the logical plan. This 
avoids listing all partitions for large tables on the driver.

This also fixes a minor bug where the partitions returned from fsRelation 
cannot be serialized without hitting a stack level too deep error. This is 
caused by serializing a stream to executors, where the stream is a recursive 
structure. If the stream is too long, the serialization stack reaches the 
maximum level of depth. The fix is to create a LocalRelation using an Array 
instead of the incoming Seq.

## How was this patch tested?

Existing tests for metadata-only queries.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-23877-metadata-only-push-filters

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20988.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20988


commit 552efaee05b64f9ed4d5496b3b1d11b57b985f85
Author: Ryan Blue <blue@...>
Date:   2018-03-14T21:50:11Z

Support filter conditions in metadata-only queries.

commit 2345896288828aefe14ebcb370d374b348400cf4
Author: Ryan Blue <blue@...>
Date:   2018-03-14T22:43:56Z

Ensure partition data is an Array.

The LocalRelation created for partition data for metadata-only queries
may be a stream produced by listing directories. If the stream is large,
serializing the LocalRelation to executors results in a stack overflow
because the stream is a recursive structure of (head, rest-of-stream).




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r179521697
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1185,6 +1185,13 @@ object SQLConf {
   .stringConf
   .createWithDefault("")
 
+  val DISABLED_V2_DATA_SOURCE_READERS = 
buildConf("spark.sql.disabledV2DataSourceReaders")
--- End diff --

Why is this necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r179521502
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -213,6 +215,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
   }
 }
 
+/**
+ * Replaces [[OrcDataSourceV2]] with [[DataSource]] if parent node is 
[[InsertIntoTable]].
+ * This is because [[OrcDataSourceV2]] doesn't support writing data yet.
--- End diff --

I agree with @jose-torres. If there is a general problem when writes aren't 
supported, then shouldn't this be a generic rule that provides a good error 
message?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r179521100
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartitionUtil.scala
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{FileNotFoundException, IOException}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.TaskContext
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.NextIterator
+
+object FilePartitionUtil extends Logging {
+
+  def getFilePartitions(
+  sparkSession: SparkSession,
+  partitionedFiles: Seq[PartitionedFile],
+  maxSplitBytes: Long): Seq[FilePartition] = {
+val partitions = new ArrayBuffer[FilePartition]
+val currentFiles = new ArrayBuffer[PartitionedFile]
+var currentSize = 0L
+
+/** Close the current partition and move to the next. */
+def closePartition(): Unit = {
+  if (currentFiles.nonEmpty) {
+val newPartition =
+  FilePartition(
+partitions.size,
+currentFiles.toArray.toSeq) // Copy to a new Array.
+partitions += newPartition
+  }
+  currentFiles.clear()
+  currentSize = 0
+}
+
+val openCostInBytes = 
sparkSession.sessionState.conf.filesOpenCostInBytes
+// Assign files to partitions using "Next Fit Decreasing"
+partitionedFiles.foreach { file =>
+  if (currentSize + file.length > maxSplitBytes) {
+closePartition()
+  }
+  // Add the given file to the current partition.
+  currentSize += file.length + openCostInBytes
+  currentFiles += file
+}
+closePartition()
+partitions
+  }
+
+  def compute(
--- End diff --

Why is this named "compute" and not "open" or something more specific?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r179520643
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.orc
+
+import java.net.URI
+import java.util.Locale
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.{OrcConf, OrcFile}
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, JoinedRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources._
+import 
org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, 
OrcDeserializer, OrcFilters, OrcUtils}
+import 
org.apache.spark.sql.execution.datasources.v2.ColumnarBatchFileSourceReader
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+class OrcDataSourceV2 extends DataSourceV2 with ReadSupport with 
ReadSupportWithSchema {
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+new OrcDataSourceReader(options, None)
+  }
+
+  override def createReader(schema: StructType, options: 
DataSourceOptions): DataSourceReader = {
+new OrcDataSourceReader(options, Some(schema))
+  }
+}
+
+case class OrcDataSourceReader(options: DataSourceOptions, 
userSpecifiedSchema: Option[StructType])
+  extends ColumnarBatchFileSourceReader
+  with SupportsPushDownCatalystFilters {
+
+  override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
+OrcUtils.readSchema(sparkSession, files)
+  }
+
+  private var pushedFiltersArray: Array[Expression] = Array.empty
+
+  override def readFunction: PartitionedFile => Iterator[InternalRow] = {
--- End diff --

I think it is a bad idea to continue using `PartitionedFile => 
Iterator[InternalRow]` in v2.

I understand not wanting to change much about how this works, just to get 
the code behind the v2 API. But this pattern is broken and causes resource 
problems that the v2 API nudges implementers to fix.

What resource problems? This doesn't implement close properly, forcing 
close to happen at task end by calling functions registered when files are 
opened. We've gone back through and replaced the iterators with closeable 
versions so that we release resources more quickly because the callback-based 
close does not scale.

I would like to see this problem fixed instead of copying it into v2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r179516352
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileReaderFactory.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
FilePartitionUtil, PartitionedFile}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+
+case class FileReaderFactory[T](
--- End diff --

Why is this class public? Isn't this internal to HadoopFsRelation's v2 
implementation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r179514000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -368,8 +368,7 @@ case class FileSourceScanExec(
 val bucketed =
   selectedPartitions.flatMap { p =>
 p.files.map { f =>
-  val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
--- End diff --

Why does this only make sense for this PR? It looks like this is a 
reasonable refactor that could be stand-alone.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-04-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r179513582
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -187,6 +189,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 "read files of Hive data source directly.")
 }
 
+// SPARK-23817 Since datasource V2 didn't support reading multiple 
files yet,
+// ORC V2 is only used when loading single file path.
+val allPaths = CaseInsensitiveMap(extraOptions.toMap).get("path") ++ 
paths
+val orcV2 = OrcDataSourceV2.satisfy(sparkSession, source, 
allPaths.toSeq)
+if (orcV2.isDefined) {
+  option("path", allPaths.head)
+  source = orcV2.get
+}
--- End diff --

How does v2 not support reading multiple files?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...

2018-03-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20397
  
@cloud-fan, can we revert this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...

2018-03-14 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20579
  
I agree, we should probably add a check for storing a DataFrame with no 
columns for now. This is normally caught by the pre-insert rules, but since the 
table is getting "created" in this case there is nothing to check. 

In the future, I think that the create and insert should be logically 
separate so that the create will fail and complain that you can't create a 
table without at least one column. I think it will be cleaner to separate 
concerns like that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20752: [SPARK-23559][SS] Create StreamingDataWriterFactory for ...

2018-03-06 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20752
  
> Right now, we're still at the point where we aren't quite sure what a 
streaming API needs to look like. We're starting from basically ground zero; 
the V1 streaming API just throws a DataFrame at the sink and tells it to catch. 
So we need to iterate towards something that works at all before a meaningful 
design discussion is possible.

Thanks for the context. This aligns with the impression I've gotten and it 
makes sense. My push for separation between the batch and streaming sides comes 
from wanting to keep that evolution from making too many changes to the batch 
side that's better understood. I also think that streaming is different enough 
that we might be heading in the wrong direction by trying to combine the 
interfaces too early on.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20752: [SPARK-23559][SS] Create StreamingDataWriterFactory for ...

2018-03-06 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20752
  
Thanks for the clear summary, @cloud-fan. I think we want to make it easy 
to support batch, and then easy to reuse those internals to support streaming 
by adding new mix-in interfaces. Streaming is more complicated for 
implementers, and I'd like to help people conceptually ramp up instead of 
requiring a lot of understanding to get the simple cases working.

I think we may also want to put a design for the streaming side on the dev 
list. If the batch side warranted a design discussion, then I think the 
streaming side does as well. Changing the batch side for streaming changes as 
they become necessary doesn't seem like a good way to arrive at a solid design 
to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...

2018-03-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20752#discussion_r172621472
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+
+@InterfaceStability.Evolving
+public interface StreamingDataWriterFactory extends 
DataWriterFactory {
+  /**
+   * Returns a data writer to do the actual writing work.
+   *
+   * If this method fails (by throwing an exception), the action would 
fail and no Spark job was
+   * submitted.
+   *
+   * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
+   *Usually Spark processes many RDD partitions at the 
same time,
+   *implementations should use the partition id to 
distinguish writers for
+   *different partitions.
+   * @param attemptNumber Spark may launch multiple tasks with the same 
task id. For example, a task
+   *  failed, Spark launches a new task wth the same 
task id but different
+   *  attempt number. Or a task is too slow, Spark 
launches new tasks wth the
+   *  same task id but different attempt number, which 
means there are multiple
+   *  tasks with the same task id running at the same 
time. Implementations can
+   *  use this attempt number to distinguish writers 
of different task attempts.
+   * @param epochId A monotonically increasing id for streaming queries 
that are split in to
+   *discrete periods of execution. For non-streaming 
queries,
+   *this ID will always be 0.
+   */
+  DataWriter createDataWriter(int partitionId, int attemptNumber, long 
epochId);
+
+  @Override default DataWriter createDataWriter(int partitionId, int 
attemptNumber) {
+throw new IllegalStateException("Streaming data writer factory cannot 
create data writers without epoch.");
--- End diff --

Can you point me to the code where this would need to change? I don't see 
it here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...

2018-03-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20752#discussion_r172620679
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
 ---
@@ -27,6 +28,9 @@
  *
  * Streaming queries are divided into intervals of data called epochs, 
with a monotonically
  * increasing numeric ID. This writer handles commits and aborts for each 
successive epoch.
+ *
+ * Note that StreamWriter implementations should provide instances of
+ * {@link StreamingDataWriterFactory}.
--- End diff --

What do you think about removing the `SupportsWriteInternalRow` and always 
using `InternalRow`? For the read side, I think using `Row` and `UnsafeRow` is 
a problem: https://issues.apache.org/jira/browse/SPARK-23325

I don't see the value of using `Row` instead of `InternalRow` for readers, 
so maybe we should just simplify on both the read and write paths.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...

2018-03-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20752#discussion_r172616831
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -54,7 +55,14 @@ case class WriteToDataSourceV2Exec(writer: 
DataSourceWriter, query: SparkPlan) e
   override protected def doExecute(): RDD[InternalRow] = {
 val writeTask = writer match {
   case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
-  case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+  case w: MicroBatchWriter =>
+new StreamingInternalRowDataWriterFactory(w.createWriterFactory(), 
query.schema)
+  case w: StreamWriter =>
+new StreamingInternalRowDataWriterFactory(
+  
w.createWriterFactory().asInstanceOf[StreamingDataWriterFactory[Row]],
--- End diff --

This will cause a cast exception, right? It think it is better to use a 
separate create method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...

2018-03-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20752#discussion_r172616601
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
 ---
@@ -27,6 +28,9 @@
  *
  * Streaming queries are divided into intervals of data called epochs, 
with a monotonically
  * increasing numeric ID. This writer handles commits and aborts for each 
successive epoch.
+ *
+ * Note that StreamWriter implementations should provide instances of
+ * {@link StreamingDataWriterFactory}.
--- End diff --

What about adding `createStreamWriterFactory` that returns the streaming 
interface? That would make it easier for implementations and prevent throwing 
cast exceptions because a `StreamingDataWriterFactory` is expected.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...

2018-03-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20752#discussion_r172613993
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+
+@InterfaceStability.Evolving
+public interface StreamingDataWriterFactory extends 
DataWriterFactory {
+  /**
+   * Returns a data writer to do the actual writing work.
+   *
+   * If this method fails (by throwing an exception), the action would 
fail and no Spark job was
+   * submitted.
+   *
+   * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
+   *Usually Spark processes many RDD partitions at the 
same time,
+   *implementations should use the partition id to 
distinguish writers for
+   *different partitions.
+   * @param attemptNumber Spark may launch multiple tasks with the same 
task id. For example, a task
+   *  failed, Spark launches a new task wth the same 
task id but different
+   *  attempt number. Or a task is too slow, Spark 
launches new tasks wth the
+   *  same task id but different attempt number, which 
means there are multiple
+   *  tasks with the same task id running at the same 
time. Implementations can
+   *  use this attempt number to distinguish writers 
of different task attempts.
+   * @param epochId A monotonically increasing id for streaming queries 
that are split in to
+   *discrete periods of execution. For non-streaming 
queries,
+   *this ID will always be 0.
+   */
+  DataWriter createDataWriter(int partitionId, int attemptNumber, long 
epochId);
+
+  @Override default DataWriter createDataWriter(int partitionId, int 
attemptNumber) {
+throw new IllegalStateException("Streaming data writer factory cannot 
create data writers without epoch.");
--- End diff --

Why extend `DataWriterFactory` if this method is going to throw an 
exception? Why not make them independent interfaces?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

2018-03-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20710
  
Epoch ID is not a valid part of the logical place in a query for batch. I 
think we should separate batch and streaming, as they are already coming from 
different interfaces. There's no need to pass useless information to a batch 
writer or committer.

Implementations can choose to use the same logic if they want, but we 
should keep the API focused on what is needed, to keep it reasonable for 
implementers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

2018-03-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20710
  
> Data source writers need to be able to reason about what progress they've 
made, which is impossible in the streaming case if each epoch is its own 
disconnected query.

I don't think the writers necessarily need to reason about progress. Are 
you saying that there are guarantees the writers need to make, like ordering 
how data appears?

I'm thinking of an implementation that creates a file for each task commit 
and the driver's commit operation makes those available. That doesn't require 
any progress tracking on tasks.

As far as a writer knowing that different epochs are part of the same 
query: why? Is there something the writer needs to do? If so, then I think that 
is more of an argument for a separate streaming interface, or else batch 
implementations that ignore the epoch might do the wrong thing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

2018-03-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20710
  
My question is: why can't we use a batch interface for batch and 
micro-batch (which behaves like batch) and add a separate streaming interface 
for continuous streaming? I see no reason to have epoch ID for batch, and it 
seems janky to add options that implementers should know to ignore.

> Spark may need to distinguish between different segments of the data when 
talking to the remote sink.

For which case, continuous or micro-batch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

2018-03-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20710
  
Could the non-continuous streaming mode just use the batch interface, since 
each write is basically separate?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

2018-03-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20710
  
@jose-torres, can you explain that more for me? Why would callers only use 
one interface but not the other? Wouldn't streaming use one and batch the 
other? Why would batch need to know about streaming and vice versa? The 
simplification is for implementers. It seems odd for implementations to deal 
with parameters that are for something else (e.g., don't worry about this for 
batch).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

2018-03-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20710
  
@tdas, thanks for letting us know. I'm really wondering if we should be 
using the same interfaces between batch and streaming. The epoch id strikes me 
as strange for data sources that won't support streaming. What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172324207
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

I think it is better to have a lazy val throw an exception if it is called 
at the wrong time (if it is private) than to add the cast because the exception 
at least validates that assumptions are met and can throw a reasonable error 
message. The cast might hide the problem, particularly over time as this code 
evolves. It would be reasonable to add another path that returns 
`Seq[DataReaderFactory[_]]` because that's the method contract, even though 
there is an assumption in the callers about how it will behave. As much of the 
contract between methods as possible should be expressed in types.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172320487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

Why not separate the method into `batchReaderFactories` and 
`readerFactories`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r172319605
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -48,6 +48,9 @@
*  same task id but different attempt number, which 
means there are multiple
*  tasks with the same task id running at the same 
time. Implementations can
*  use this attempt number to distinguish writers 
of different task attempts.
+   * @param epochId A monotonically increasing id for streaming queries 
that are split in to
+   *discrete periods of execution. For non-streaming 
queries,
+   *this ID will always be 0.
*/
-  DataWriter createDataWriter(int partitionId, int attemptNumber);
+  DataWriter createDataWriter(int partitionId, int attemptNumber, long 
epochId);
--- End diff --

Why are we using the same interface for streaming and batch here? Is there 
a compelling reason to do so instead of adding `StreamingWriterFactory`? Are 
the guarantees for an epoch identical to those of a single batch job?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20726: [SPARK-23574][CORE] Report SinglePartition in DataSource...

2018-03-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20726
  
Looks good to me other than a minor point on the private `readerFactories` 
val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172317377
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

Why not separate the cases for columnar batch and unsafe rows? That would 
avoid needing to cast this later to `Seq[DataReaderFactory[UnsafeRow]]` and 
`Seq[DataReaderFactory[ColumnarBatch]]`, which isn't a very clean solution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20692
  
Good point on nested types. I don't think heavy nesting is the usual case, 
but we can definitely improve the explain result in the long term by separating 
it out. Maybe just using a high-level type (e.g. struct<...>) for now would 
work?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20692
  
My intent is just to advocate for clear feedback on the content of PRs. 
Good to hear your confidence in @mgaido91, and if he wants to work on a better 
explain, that's great too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20692
  
> I think you can add the patch to your fork.

My primary concern isn't the feature, although I do think it is useful. My 
concern is how we work with contributors. My worry here is that the objections 
are not on the merits of this PR.

Telling a contributor to either take on something much larger or come back 
in a few months is discouraging and stifles contributions. This patch is here 
now, and the focus should be on either why the idea is a not good one (i.e., 
*why* types should not go in plans) or what should be improved to fix the 
implementation. That could also be to propose a better alternative, but without 
an alternate proposal, asking for something different without constructive 
criticism of the PR leaves me, at least, simply confused.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20692
  
> this is not urgent based on our release schedule.

Marco is contributing this right now. It is a bad idea to ask contributors 
to show up in 4 months, if we don't have a better option by then. And in my 
experience, it's really hard to show up right before a release and get anything 
in because the focus is on the release.

> I do not like adding the types into the plans that are already very 
complex.

I see this as a critical piece of information that is missing from Spark 
plans. I've had to resort to a debugger several times to see what Spark thinks 
types are, and I think it is worth the extra size.

I'm all for debating the value of adding these types, but we should focus 
on that question. Implementations can be fixed. And when have we ever avoided a 
contribution because it was too early?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20692
  
I don't think it is a good idea to block a fix like this on a redesign of 
explain plans. If that redesign was currently being worked on and this made it 
more difficult, it might be reasonable. But this can be a small interim fix and 
doesn't really affect larger plans.

If this patch needs to be fixed to be more clean, then let's get that 
feedback to Marco. He has been really responsive on this contribution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20692
  
I don't see a compelling reason to block this work, which is well-defined 
and a reasonably small patch. What is here would help users with debugging.

@gatorsmile, If you have an alternative approach, could you write it up and 
propose it on the dev list?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20692
  
@mgaido91, I'm happy without another option. Let's add it in the future if 
we find that we need it, but not assume that we will. This change should give 
most of the information needed to debug types.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20692#discussion_r171626542
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -94,7 +94,7 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
-  override def argString: String = {
+  override def argString(isLeaf: Boolean): String = {
--- End diff --

What is the value of adding `isLeaf`? Don't leaf nodes already know they 
are? Why not just have leaf nodes call `outputStringWithTypes` or something and 
avoid all of the API changes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20692#discussion_r171624644
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 ---
@@ -106,7 +106,7 @@ abstract class Attribute extends LeafExpression with 
NamedExpression with NullIn
 
   override def toAttribute: Attribute = this
   def newInstance(): Attribute
-
+  def stringWithType: String = s"$toString: ${dataType.simpleString}"
--- End diff --

Nit: the `:` isn't needed and will accumulate to quite a few extra 
characters in plans. SQL uses `(id bigint, data string)` without `:` so I think 
we should mirror that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20692
  
I agree with @cloud-fan that the type information doesn't need to be in 
every node of the plan, just in the scans and generators. We want enough 
information that we can tell what types are there, but we want to keep the 
output small if possible so there is a balance.

I'm in favor of an option for extended types if it isn't too much trouble, 
but I don't think it is a very important feature.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-03-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20647
  
Looks like there's an unnecessary redact call in there and I found a couple 
of nits, but I think this is ready to go.

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...

2018-03-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20647#discussion_r171618852
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.util.Utils
+
+/**
+ * A trait that can be used by data source v2 related query plans(both 
logical and physical), to
+ * provide a string format of the data source information for explain.
+ */
+trait DataSourceV2StringFormat {
+
+  /**
+   * The instance of this data source implementation. Note that we only 
consider its class in
+   * equals/hashCode, not the instance itself.
+   */
+  def source: DataSourceV2
+
+  /**
+   * The output of the data source reader, w.r.t. column pruning.
+   */
+  def output: Seq[Attribute]
+
+  /**
+   * The options for this data source reader.
+   */
+  def options: Map[String, String]
+
+  /**
+   * The created data source reader. Here we use it to get the filters 
that has been pushed down
+   * so far, itself doesn't take part in the equals/hashCode.
+   */
+  def reader: DataSourceReader
+
+  private lazy val filters = reader match {
+case s: SupportsPushDownCatalystFilters => 
s.pushedCatalystFilters().toSet
+case s: SupportsPushDownFilters => s.pushedFilters().toSet
+case _ => Set.empty
+  }
+
+  private def sourceName: String = source match {
+case registered: DataSourceRegister => registered.shortName()
+case _ => source.getClass.getSimpleName.stripSuffix("$")
+  }
+
+  def metadataString: String = {
+val entries = scala.collection.mutable.ArrayBuffer.empty[(String, 
String)]
+
+if (filters.nonEmpty) {
+  entries += "Pushed Filters" -> filters.mkString("[", ", ", "]")
--- End diff --

Nit: Does this need "Pushed"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    2   3   4   5   6   7   8   9   10   11   >