[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21122#discussion_r186119360 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -1354,7 +1354,8 @@ class HiveDDLSuite val indexName = tabName + "_index" withTable(tabName) { // Spark SQL does not support creating index. Thus, we have to use Hive client. - val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + val client = + spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client --- End diff -- Why are we passing the client using the catalog in the first place? Is this convenience, or is there a reason why we can't pass both separately? I don't think we should be doing this without a reason. I don't like needless casts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 @maropu, I suspect that the problem is that comparison is different for strings: `"17297598712"` is less than `"5"` with string comparison. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 @gatorsmile, are you happy committing this with the benchmark results? @maropu, thanks for taking the time to add these benchmarks, it is really great to have them so we can monitor the performance over time! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 @cloud-fan, actually I tried a lot of different queries yesterday, including joins and aggregations. The only thing that didn't work was `collect` for a `select * from t` because `SparkPlan` assumes that the [rows will be unsafe](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L254). I'm planning to do more testing, but I don't see anything that requires `UnsafeRow` in generated code. @marmbrus, what was the original intent in codegen? Should codegen use `InternalRow` or `UnsafeRow`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 @maropu, looking at the pushdown benchmark, it looks like ORC and Parquet either both benefit or both do not benefit from pushdown. In some cases ORC is much faster, which is due to the fact that ORC will skip reading pages, not just row groups. But, when ORC benefits from pushdown so does Parquet, for example the `Select 1 int row (value = 7864320)` case. I think that you were expecting a string comparison case to have a significant benefit over non-pushdown. But I would only expect that if ORC had a similar benefit. That's because this is dependent on the clustering of values in the file so that Parquet can eliminate row groups. If ORC didn't have a benefit, then I would expect that the data just isn't clustered in a way that helps. I'm not sure how you're generating data, but I'd recommend adding a sorted column case with enough data to create multiple row groups (or stripes for ORC). That would write data so that you can ignore some row groups and you should see a speed up. Parquet also supports dictionary-based row group filtering. To test that, make sure you have a column that is entirely dictionary-encoded: pick a small set of values and randomly draw from that set. Then if you search for a value that isn't in that set you should see a speedup. Also make sure that you have `parquet.filter.dictionary.enabled=true` set in the Hadoop configuration so that Parquet uses dictionary filtering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 @maropu, are you sure about the INT and FLOAT columns? I think you might have that assessment backwards. Here's the INT results from the PR gist: ``` SQL Single INT Column Scan: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative SQL Parquet Vectorized 149 / 162105.5 9.5 1.0X SQL Parquet MR1825 / 1836 8.6 116.1 0.1X ``` And here are the INT results from the master gist: ``` SQL Single INT Column Scan: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative SQL Parquet Vectorized 250 / 292 63.0 15.9 1.0X SQL Parquet MR3175 / 3202 5.0 201.8 0.1X ``` I think that shows that the PR result was significantly faster, not slower. (The other INT test was about the same.) Here's the FLOAT column from the PR gist: ``` SQL Single FLOAT Column Scan:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative SQL Parquet Vectorized 145 / 158108.8 9.2 1.0X SQL Parquet MR1840 / 1843 8.5 117.0 0.1X ``` And FLOAT from the master gist: ``` SQL Single FLOAT Column Scan:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative SQL Parquet Vectorized 261 / 316 60.2 16.6 1.0X SQL Parquet MR3267 / 3284 4.8 207.7 0.1X ``` Am I reading this incorrectly? I'm considering lower time values and higher rate values to be better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 @cloud-fan and @jose-torres: I looked at `explain codegen` for reading from a Parquet table (with vectorized reads disabled) and it doesn't look like there is a dependency on `UnsafeRow`: ``` explain codegen select * from test ``` ``` Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *FileScan parquet rblue.test[id#40L,data#41] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://bucket/warehouse/blue.db/test/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,data:string> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 009 */ private scala.collection.Iterator scan_input; /* 010 */ /* 011 */ public GeneratedIterator(Object[] references) { /* 012 */ this.references = references; /* 013 */ } /* 014 */ /* 015 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 016 */ partitionIndex = index; /* 017 */ this.inputs = inputs; /* 018 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 019 */ scan_input = inputs[0]; /* 020 */ /* 021 */ } /* 022 */ /* 023 */ protected void processNext() throws java.io.IOException { /* 024 */ while (scan_input.hasNext()) { /* 025 */ InternalRow scan_row = (InternalRow) scan_input.next(); /* 026 */ scan_numOutputRows.add(1); /* 027 */ append(scan_row); /* 028 */ if (shouldStop()) return; /* 029 */ } /* 030 */ } /* 031 */ } ``` I've looked at a few simple queries with filters, projects, and aggregation and it doesn't look like any of the generated code depends on `UnsafeRow`. Can anyone confirm that it is not a requirement to pass `UnsafeRow` into generated code? If there is no requirement for the rows to be `UnsafeRow`, then is it necessary to add an `UnsafeProjection` or would the copy to unsafe make execution slower? If the rows passed from the data source are `UnsafeRow`, then `UnsafeProjection` detects it and copies the row buffer (see [examples](https://github.com/Netflix/iceberg/blob/parquet-value-readers/spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java#L256-L262)). That's faster than copying individual values, but slower than just using the row as-is. Not adding a projection would make this case faster. If the rows passed from the data source are `InternalRow` and not `UnsafeRow`, then we *could* copy them immediately, but it is very likely that the data is already going to be copied. A projection, for example, immediately copies all of the data out of the `UnsafeRow` and an initial copy to unsafe is just extra work. Similarly, a filter is probably selective enough that it makes sense to wait until after the filter runs to copy the entire row of data to unsafe. In all of the cases that I've looked at, a copy to unsafe would only slow down execution. Unsafe rows may have better cache locality, but the copy reads *all* of the data anyway. If I'm right and we do not need to insert a copy to `UnsafeRow`, then we don't need the `SupportsScanUnsafeRow` trait. Data sources can still produce `UnsafeRow` and it would work without a problem. The only time we need to know that an `InternalRow` is actually an `UnsafeRow` is if we are adding a projection to unsafe, in which case we could avoid a copy of the row buffer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21143 @cloud-fan, that's kind of the point I was trying to make. It is too difficult to do whole-stage codegen, but we could add a codegen filter before whole-stage codegen. Why make the implementation handle it using Spark internals when we could do it more cleanly in Spark? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to R...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21145#discussion_r185310952 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -299,13 +299,13 @@ private[kafka010] class KafkaMicroBatchReader( } } -/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ +/** A [[ReadTask]] for reading Kafka data in a micro-batch streaming query. */ private[kafka010] case class KafkaMicroBatchDataReaderFactory( --- End diff -- Sure, good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21143 @cloud-fan, union doesn't really help. I already have support for mixed-formats working just fine. The format isn't the problem, it is filtering (and a similar problem with projection). Parquet allows you to push down filters, while Avro doesn't. Right now, I'm running filters inside my data source to ensure that the result always matches pushed filters, which is okay but doesn't use codegen. Since we already have a need for per-split filters for residuals, we could do something similar in Spark instead of in the data sources and allow each split to return a residual. Then Spark would add a codegen'ed filter before proceeding with the rest of the physical plan. You might think of it as a `ResidualFilter` node, where the filter expression changes, instead of a separate physical plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to ReadTask...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21145 @cloud-fan and @henryr, do you have an opinion about naming here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 @mswit-databricks, I wouldn't worry about that. We've limited the length of binary and string fields. In the next version of Parquet, we're planning on releasing page indexes, which are lower and upper bounds instead of min and max values. That gives us more flexibility to shorten values and avoid the case that you're worried about. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to be an in...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21122 Thanks for pointing this out, @henryr. This looks like a good change to support multiple catalogs. I think it looks fine, other than exposing `unwrapped` to get the Hive client. I think we're probably abusing that instead of passing the client so I think it makes sense to pass the client correctly instead of using `unwrapped`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21122#discussion_r185138677 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog - extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] { +trait ExternalCatalog { import CatalogTypes.TablePartitionSpec + // Returns the underlying catalog class (e.g., HiveExternalCatalog). + def unwrapped: ExternalCatalog = this --- End diff -- Is there a better way to pass the Hive client? It looks like the uses of `unwrapped` actually just get the Hive client from the HiveExternalCatalog. If we can pass that through, it would prevent the need for this. I think that would be cleaner, unless there is a problem with that I'm missing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r185055954 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -213,6 +215,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } } +/** + * Replace the V2 data source of table in [[InsertIntoTable]] to V1 [[FileFormat]]. + * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails + * since there is no correspoding physical plan. + * This is a temporary hack for making current data source V2 work. --- End diff -- I don't like the idea of "temporary hack" rules. What is the long-term plan for getting rid of this? Or is this something we will end up supporting forever? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 Is it necessary to block this commit on benchmarks? We know that it doesn't degrade performance from the Parquet benchmarks and TPC-DS run. What do you think needs to be done before this can move forward? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 Yes, it is safe to use push-down for string columns in data written with this version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to ReadTask...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21145 I think `ReadTask` is fine. That name does not imply that you can use the object itself to read, but it does correctly show that it is one task in a larger operation. I think the name implies that it represents something to be read, which is correct, and it is reasonable to look at the API for that object to see how to read it. That can be clearly accomplished, so I don't think we need a different name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to ReadTask...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21145 @arunmahadevan, the problem is that the current naming is misleading. This is not a factory (it only produces one specific reader) and it does not have the same lifecycle as the write-side factory. Using parallel naming for the two implies an equivalence that doesn't exist. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21143 Thanks for working on this, @cloud-fan! I was thinking about needing it just recently so that data sources can delegate to Spark when needed. I'll have a thorough look at it tomorrow, but one quick high-level question: should we make these residuals based on the input split instead? Input splits might have different residual filters that need to be applied. For example, if you have a time range query, `ts > X`, and are storing data by day, then you know that when `day(ts) > day(X)` that `ts > X` *must* be true, but when `day(ts) = day(X)`, `ts > X` *might* be true. So for only some splits, when scanning the boundary day, you need to run the original filter, but not for any other splits. Another use case for a per-split residual is when splits might be different file formats. Parquet allows pushing down filters, but Avro doesn't. In a mixed table format it would be great for Avro splits to return the entire expression as a residual, while Parquet splits do the filtering. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 Thank you @maropu! What resources does the run require? Is it something we could create a Jenkins job to run? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to R...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21145#discussion_r184235329 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -299,13 +299,13 @@ private[kafka010] class KafkaMicroBatchReader( } } -/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ +/** A [[ReadTask]] for reading Kafka data in a micro-batch streaming query. */ private[kafka010] case class KafkaMicroBatchDataReaderFactory( --- End diff -- This fixes the API, not implementations, and it already touches 30+ files. I'd rather not fix the downstream classes for two reasons. First, to avoid this becoming really large. Second, we need to be able to evolve these APIs without requiring changes to all implementations. This is still evolving and if we need to update 20+ implementations to make simple changes, then I think that's a problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21118#discussion_r184216025 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -86,7 +87,7 @@ class KafkaContinuousReader( KafkaSourceOffset(JsonUtils.partitionOffsets(json)) } - override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = { + override def createReadTasks(): ju.List[ReadTask[InternalRow]] = { --- End diff -- I've moved this to #21145. I'll rebase this PR on that one, so lets try to get that in first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 The fix for PARQUET-686 was to suppress min/max stats. It is safe to push filters, but those filters can't be used without the stats. 1.10.0 has the correct stats and can use those filters. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r184156731 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { case plan: InMemoryRelation => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 740) + assert(inMemoryRelation.computeStats().sizeInBytes === 800) --- End diff -- This is data dependent so it is hard to estimate. We write the stats for older readers when the type uses a signed sort order, so it is limited to mostly primitive types and won't be written for byte arrays or utf8 data. That limits the size to 16 bytes + thrift overhead per page and you might have about 100 pages per row group. So 1.5k per 128MB, which is about 0.001%. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 There are two main reasons to update. First, the problem behind SPARK-17213 is fixed, hence the new min and max fields. Second, this updates the internal byte array management, which is needed for page skipping in the next few versions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r184139736 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { case plan: InMemoryRelation => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 740) + assert(inMemoryRelation.computeStats().sizeInBytes === 800) --- End diff -- Parquet fixed a problem with value ordering in statistics, which required adding new metadata min and max fields. For older readers, Parquet also writes the old values when it makes sense to. This is a slight increase in overhead, which is more noticeable when files contain just a few records. Don't be alarmed at the percentage difference here, it is just a small file. Parquet isn't increasing file sizes by 8%, that would be silly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21145: SPARK-24073: Rename DataReaderFactory to ReadTask...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21145 SPARK-24073: Rename DataReaderFactory to ReadTask. ## What changes were proposed in this pull request? This reverses the changes in SPARK-23219, which renamed ReadTask to DataReaderFactory. The intent of that change was to make the read and write API match (write side uses DataWriterFactory), but the underlying problem is that the two classes are not equivalent. ReadTask/DataReader function as Iterable/Iterator. One ReadTask is a specific read task for a partition of the data to be read, in contrast to DataWriterFactory where the same factory instance is used in all write tasks. ReadTask's purpose is to manage the lifecycle of DataReader with an explicit create operation to mirror the close operation. This is no longer clear from the API, where DataReaderFactory appears to be more generic than it is and it isn't clear why a set of them is produced for a read. ## How was this patch tested? Existing tests, which have been updated to use the new name. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-24073-revert-data-reader-factory-rename Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21145.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21145 commit c364c05d3141bbe0ed29a2b02cecfa541d9c8212 Author: Ryan Blue <blue@...> Date: 2018-04-24T19:55:25Z SPARK-24073: Rename DataReaderFactory to ReadTask. This reverses the changes in SPARK-23219, which renamed ReadTask to DataReaderFactory. The intent of that change was to make the read and write API match (write side uses DataWriterFactory), but the underlying problem is that the two classes are not equivalent. ReadTask/DataReader function as Iterable/Iterator. One ReadTask is a specific read task for a partition of the data to be read, in contrast to DataWriterFactory where the same factory instance is used in all write tasks. ReadTask's purpose is to manage the lifecycle of DataReader with an explicit create operation to mirror the close operation. This is no longer clear from the API, where DataReaderFactory appears to be more generic than it is and it isn't clear why a set of them is produced for a read. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 > Based on the previous upgrade (e.g., #13280 (comment)), we hit the performance regressions when we upgrade Parquet and we did the revert at the end. I should point out that the regression wasn't reproducible, so we aren't sure what the cause was. We also didn't have performance numbers on the Parquet side or a case of anyone running it in production (we have been for a couple months). But, I can understand wanting to be thorough. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 Okay, I don't have the time to set up and run benchmarks without a stronger case for this causing a regression (given the Parquet testing), but other people should feel free to pick this up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 @cloud-fan, is there a Jenkins job to run it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 Yeah, we should probably add a projection. It's probably only working because the InternalRows that are produced are all UnsafeRow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 @jose-torres, @cloud-fan, can you take a look at this? It updates the v2 API to use InternalRow by default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21118 SPARK-23325: Use InternalRow when reading with DataSourceV2. ## What changes were proposed in this pull request? This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins. Because the API is changing significantly in the same places, this also renames ReaderFactory back to ReadTask. Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow. Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow. ## How was this patch tested? This uses existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23325-datasource-v2-internal-row Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21118.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21118 commit eddd049ed78970dda0796a37462e52f53bfeacc4 Author: Ryan Blue <blue@...> Date: 2018-04-20T20:15:58Z SPARK-23325: Use InternalRow when reading with DataSourceV2. This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins. Because the API is changing significantly in the same places, this also renames ReaderFactory back to ReadTask. Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow. Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/2 Thanks for doing this as a follow-up. I had one minor comment, but otherwise I'm +1. I see what you mean about using `PhysicalOperation` now. It is slightly cleaner and I guess `PhysicalOperation` is the right way to accumulate the filters and projection from a sub-tree. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21111: [SPARK-23877][SQL][followup] use PhysicalOperatio...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/2#discussion_r183101013 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -114,11 +119,8 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic relation match { case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) => val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -val partitionData = fsRelation.location.listFiles(relFilters, Nil) -// partition data may be a stream, which can cause serialization to hit stack level too -// deep exceptions because it is a recursive structure in memory. converting to array -// avoids the problem. --- End diff -- Yes, that does fix it but that's in a non-obvious way. What isn't clear is what guarantees that the rows used to construct the LocalRelation will never need to be serialized. Would it be reasonable for a future commit to remove the `@transient` modifier and re-introduce the problem? I would rather this return the data in a non-recursive structure, but it's a minor point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r182858087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None + } - case f @ Filter(condition, child) if condition.deterministic => -unapply(child).flatMap { case (partAttrs, relation) => - if (f.references.subsetOf(partAttrs)) Some((partAttrs, relation)) else None -} +case f @ Filter(condition, child) if condition.deterministic => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (f.references.subsetOf(partAttrs)) { + Some((partAttrs, splitConjunctivePredicates(condition) ++ filters, relation)) --- End diff -- Good catch. I've added a test case and updated the `PartitionedRelation` code to keep track of both original partition attributes -- that the filter needs -- and the top-most node's output that is used by the rule. For using `PhysicalOperation` instead of `PartitionedRelation`, I don't see a compelling reason for such an invasive change. This currently adds a couple of results to unapply and keeps mostly the same logic. `PhysicalOperation` would lose the check that the references are a subset of the partition attributes and be a lot larger change. If you think this should be refactored, lets talk about that separately to understand the motivation for the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [SPARK-23952] remove type parameter in DataReader...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r182596471 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -146,7 +146,7 @@ private[kafka010] class KafkaMicroBatchReader( new KafkaMicroBatchDataReaderFactory( range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) } -factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava +factories.map(_.asInstanceOf[DataReaderFactory]).asJava --- End diff -- Why is this cast necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [SPARK-23952] remove type parameter in DataReader...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r182596392 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala --- @@ -17,52 +17,85 @@ package org.apache.spark.sql.execution.datasources.v2 -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag - import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.sources.v2.reader.DataReaderFactory +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.DataFormat +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.types.StructType -class DataSourceRDDPartition[T : ClassTag](val index: Int, val readerFactory: DataReaderFactory[T]) +class DataSourceRDDPartition(val index: Int, val factory: DataReaderFactory) extends Partition with Serializable -class DataSourceRDD[T: ClassTag]( +class DataSourceRDD( sc: SparkContext, -@transient private val readerFactories: Seq[DataReaderFactory[T]]) - extends RDD[T](sc, Nil) { +@transient private val readerFactories: Seq[DataReaderFactory], +schema: StructType) + extends RDD[InternalRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { readerFactories.zipWithIndex.map { case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) }.toArray } - override def compute(split: Partition, context: TaskContext): Iterator[T] = { -val reader = split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.createDataReader() -context.addTaskCompletionListener(_ => reader.close()) -val iter = new Iterator[T] { - private[this] var valuePrepared = false - - override def hasNext: Boolean = { -if (!valuePrepared) { - valuePrepared = reader.next() -} -valuePrepared - } - - override def next(): T = { -if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") -} -valuePrepared = false -reader.get() - } + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val factory = split.asInstanceOf[DataSourceRDDPartition].factory +val iter: DataReaderIterator[UnsafeRow] = factory.dataFormat() match { + case DataFormat.ROW => +val reader = new RowToUnsafeDataReader( + factory.createRowDataReader(), RowEncoder.apply(schema).resolveAndBind()) +new DataReaderIterator(reader) + + case DataFormat.UNSAFE_ROW => +new DataReaderIterator(factory.createUnsafeRowDataReader()) + + case DataFormat.COLUMNAR_BATCH => +new DataReaderIterator(factory.createColumnarBatchDataReader()) + // TODO: remove this type erase hack. + .asInstanceOf[DataReaderIterator[UnsafeRow]] --- End diff -- Isn't this change intended to avoid these casts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [SPARK-23952] remove type parameter in DataReader...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r182596274 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java --- @@ -17,12 +17,12 @@ package org.apache.spark.sql.sources.v2.reader.streaming; +import java.util.Optional; --- End diff -- Nit: this is a cosmetic change that should be reverted before committing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [SPARK-23952] remove type parameter in DataReader...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r182596153 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java --- @@ -52,10 +56,46 @@ } /** - * Returns a data reader to do the actual reading work. + * The output data format of this factory's data reader. Spark will invoke the corresponding + * create data reader method w.r.t. the return value of this method: + * + * {@link DataFormat#ROW}: {@link #createRowDataReader()} + * {@link DataFormat#UNSAFE_ROW}: {@link #createUnsafeRowDataReader()} + * @{@link DataFormat#COLUMNAR_BATCH}: {@link #createColumnarBatchDataReader()} + * + */ + DataFormat dataFormat(); --- End diff -- If the data format is determined when the factory is created, then I don't see why it is necessary to change the API. This just makes it more confusing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20988: [SPARK-23877][SQL]: Use filter predicates to prune parti...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20988 @cloud-fan, I've added the test. Thanks for letting me know about HiveCatalogMetrics, that's exactly what I needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 I backported the Hadoop zstd codec to 2.7.3 without much trouble. But either way, I think that's a separate concern. This is about getting Parquet updated, not about worrying whether users can easily add compression implementations to their classpath. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r182579387 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None --- End diff -- @cloud-fan, that is basically how this works already. Each matched node calls `unapply(child)` to get the result from the child node, then it adds the current node's conditions to that result. Using `unapply` instead of `getPartitionedRelation` makes this work in the matching rule: ```scala case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, filters, relation)) => ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 @scottcarey, Parquet will use the compressors if they are available. You can add them from an external Jar and it will work. LZ4 should also work out of the box because it is included in Hadoop 2.7. I agree that it would be nice if Parquet didn't rely on Hadoop for compression libraries, but that's how it is at the moment. Feel free to fix it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r182563063 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,159 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} + } + + private byte getByte() { +try { + return (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + currentByte = getByte(); +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181899509 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181883674 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Yeah, exactly. We can detect that the buffer is backed by an array and use the other call. If we think this is worth it as a short-term fix, I'll update this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21071 Some metrics to convince ourselves that using the null scope has no performance impact would be great. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181864492 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- The reason why I moved the loop out was that I didn't think that using the byte[] API would actually be better. Parquet doesn't guarantee that these byte buffers are on the heap and backed by byte arrays, so we would need to copy the bytes out of the buffer into an array only to copy them again in the column vector call. Two copies (and possibly allocation) seemed worse than moving the loop. We could catch the case where the buffers are on-heap and make the bulk call. The drawback is that this will be temporary and will be removed when ColumnVector supports ByteBuffer. And, it only works/matters when Parquet uses on-heap buffers and Spark uses off-heap buffers. Is that worth the change to this PR? I can take a shot at it if you think it is. I'd rather update ColumnVector and then follow up though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21071 @devaraj-kavali, do you have any measurements to quantify how this impacts overall performance? We would want to know before releasing this for use because using HTrace means having it on all the time to be able to analyze slow-downs. Like @steveloughran suggests, it should also be optional (unless there is no measurable cost to having it on all the time). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21060 I agree with what @srowen said: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181826671 --- Diff: pom.xml --- @@ -129,7 +129,7 @@ 1.2.1 10.12.1.1 -1.8.2 +1.10.0 --- End diff -- I excluded the commons-pool dependency from parquet-hadoop to avoid this. I also tested the latest Parquet release with commons-pool 1.5.4 and everything passes. I don't think it actually requires 1.6. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181535144 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- It looks that way, but it actually replaces a similar loop: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java#L283-L291 The main problem is that ByteBufffer isn't supported in the column vectors. That seems beyond the scope of this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 Upstream benchmarks for buffer management changes are here: https://github.com/apache/parquet-mr/pull/390#issuecomment-338505426 That doesn't show the GC benefit for smaller buffer allocations because of the heap size. It is just to show that the changes do no harm. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21070 SPARK-23972: Update Parquet to 1.10.0. ## What changes were proposed in this pull request? This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection. ## How was this patch tested? Existing Parquet tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23972-update-parquet-to-1.10.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21070.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21070 commit 4df17a6e9726cb22e499d479a9ab48f5db18a538 Author: Ryan Blue <blue@...> Date: 2017-12-01T01:25:53Z SPARK-23972: Update Parquet to 1.10.0. This updates the vectorized path for changes in Parquet 1.10.0, which uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r181509305 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -368,8 +368,7 @@ case class FileSourceScanExec( val bucketed = selectedPartitions.flatMap { p => p.files.map { f => - val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) --- End diff -- Yeah, I think the commit itself would be self-contained reorganization. The motivation is to refactor for this PR, which is okay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20988: [SPARK-23877][SQL]: Use filter predicates to prune parti...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20988 @cloud-fan or @gatorsmile, could you review this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r179811255 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -368,8 +368,7 @@ case class FileSourceScanExec( val bucketed = selectedPartitions.flatMap { p => p.files.map { f => - val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) --- End diff -- Better organization to support other changes like this one is the reason. @jose-torres was right to point out that these changes are self-contained enough to go in a separate PR and @gengliangwang and I both agreed. Why make this commit larger than necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/20988 [SPARK-23877][SQL]: Use filter predicates to prune partitions in metadata-only queries ## What changes were proposed in this pull request? This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver. This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq. ## How was this patch tested? Existing tests for metadata-only queries. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23877-metadata-only-push-filters Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20988.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20988 commit 552efaee05b64f9ed4d5496b3b1d11b57b985f85 Author: Ryan Blue <blue@...> Date: 2018-03-14T21:50:11Z Support filter conditions in metadata-only queries. commit 2345896288828aefe14ebcb370d374b348400cf4 Author: Ryan Blue <blue@...> Date: 2018-03-14T22:43:56Z Ensure partition data is an Array. The LocalRelation created for partition data for metadata-only queries may be a stream produced by listing directories. If the stream is large, serializing the LocalRelation to executors results in a stack overflow because the stream is a recursive structure of (head, rest-of-stream). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r179521697 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1185,6 +1185,13 @@ object SQLConf { .stringConf .createWithDefault("") + val DISABLED_V2_DATA_SOURCE_READERS = buildConf("spark.sql.disabledV2DataSourceReaders") --- End diff -- Why is this necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r179521502 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -213,6 +215,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } } +/** + * Replaces [[OrcDataSourceV2]] with [[DataSource]] if parent node is [[InsertIntoTable]]. + * This is because [[OrcDataSourceV2]] doesn't support writing data yet. --- End diff -- I agree with @jose-torres. If there is a general problem when writes aren't supported, then shouldn't this be a generic rule that provides a good error message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r179521100 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartitionUtil.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.{FileNotFoundException, IOException} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.TaskContext +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator + +object FilePartitionUtil extends Logging { + + def getFilePartitions( + sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] = { +val partitions = new ArrayBuffer[FilePartition] +val currentFiles = new ArrayBuffer[PartitionedFile] +var currentSize = 0L + +/** Close the current partition and move to the next. */ +def closePartition(): Unit = { + if (currentFiles.nonEmpty) { +val newPartition = + FilePartition( +partitions.size, +currentFiles.toArray.toSeq) // Copy to a new Array. +partitions += newPartition + } + currentFiles.clear() + currentSize = 0 +} + +val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes +// Assign files to partitions using "Next Fit Decreasing" +partitionedFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { +closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file +} +closePartition() +partitions + } + + def compute( --- End diff -- Why is this named "compute" and not "open" or something more specific? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r179520643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.orc + +import java.net.URI +import java.util.Locale + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.orc.{OrcConf, OrcFile} +import org.apache.orc.mapred.OrcStruct +import org.apache.orc.mapreduce.OrcInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, JoinedRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils} +import org.apache.spark.sql.execution.datasources.v2.ColumnarBatchFileSourceReader +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.util.SerializableConfiguration + +class OrcDataSourceV2 extends DataSourceV2 with ReadSupport with ReadSupportWithSchema { + override def createReader(options: DataSourceOptions): DataSourceReader = { +new OrcDataSourceReader(options, None) + } + + override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = { +new OrcDataSourceReader(options, Some(schema)) + } +} + +case class OrcDataSourceReader(options: DataSourceOptions, userSpecifiedSchema: Option[StructType]) + extends ColumnarBatchFileSourceReader + with SupportsPushDownCatalystFilters { + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { +OrcUtils.readSchema(sparkSession, files) + } + + private var pushedFiltersArray: Array[Expression] = Array.empty + + override def readFunction: PartitionedFile => Iterator[InternalRow] = { --- End diff -- I think it is a bad idea to continue using `PartitionedFile => Iterator[InternalRow]` in v2. I understand not wanting to change much about how this works, just to get the code behind the v2 API. But this pattern is broken and causes resource problems that the v2 API nudges implementers to fix. What resource problems? This doesn't implement close properly, forcing close to happen at task end by calling functions registered when files are opened. We've gone back through and replaced the iterators with closeable versions so that we release resources more quickly because the callback-based close does not scale. I would like to see this problem fixed instead of copying it into v2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r179516352 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileReaderFactory.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{FilePartition, FilePartitionUtil, PartitionedFile} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} + +case class FileReaderFactory[T]( --- End diff -- Why is this class public? Isn't this internal to HadoopFsRelation's v2 implementation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r179514000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -368,8 +368,7 @@ case class FileSourceScanExec( val bucketed = selectedPartitions.flatMap { p => p.files.map { f => - val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) --- End diff -- Why does this only make sense for this PR? It looks like this is a reasonable refactor that could be stand-alone. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r179513582 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -187,6 +189,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } +// SPARK-23817 Since datasource V2 didn't support reading multiple files yet, +// ORC V2 is only used when loading single file path. +val allPaths = CaseInsensitiveMap(extraOptions.toMap).get("path") ++ paths +val orcV2 = OrcDataSourceV2.satisfy(sparkSession, source, allPaths.toSeq) +if (orcV2.isDefined) { + option("path", allPaths.head) + source = orcV2.get +} --- End diff -- How does v2 not support reading multiple files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20397 @cloud-fan, can we revert this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20579 I agree, we should probably add a check for storing a DataFrame with no columns for now. This is normally caught by the pre-insert rules, but since the table is getting "created" in this case there is nothing to check. In the future, I think that the create and insert should be logically separate so that the create will fail and complain that you can't create a table without at least one column. I think it will be cleaner to separate concerns like that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20752: [SPARK-23559][SS] Create StreamingDataWriterFactory for ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20752 > Right now, we're still at the point where we aren't quite sure what a streaming API needs to look like. We're starting from basically ground zero; the V1 streaming API just throws a DataFrame at the sink and tells it to catch. So we need to iterate towards something that works at all before a meaningful design discussion is possible. Thanks for the context. This aligns with the impression I've gotten and it makes sense. My push for separation between the batch and streaming sides comes from wanting to keep that evolution from making too many changes to the batch side that's better understood. I also think that streaming is different enough that we might be heading in the wrong direction by trying to combine the interfaces too early on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20752: [SPARK-23559][SS] Create StreamingDataWriterFactory for ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20752 Thanks for the clear summary, @cloud-fan. I think we want to make it easy to support batch, and then easy to reuse those internals to support streaming by adding new mix-in interfaces. Streaming is more complicated for implementers, and I'd like to help people conceptually ramp up instead of requiring a lot of understanding to get the simple cases working. I think we may also want to put a design for the streaming side on the dev list. If the batch side warranted a design discussion, then I think the streaming side does as well. Changing the batch side for streaming changes as they become necessary doesn't seem like a good way to arrive at a solid design to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20752#discussion_r172621472 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; + +@InterfaceStability.Evolving +public interface StreamingDataWriterFactory extends DataWriterFactory { + /** + * Returns a data writer to do the actual writing work. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + * + * @param partitionId A unique id of the RDD partition that the returned writer will process. + *Usually Spark processes many RDD partitions at the same time, + *implementations should use the partition id to distinguish writers for + *different partitions. + * @param attemptNumber Spark may launch multiple tasks with the same task id. For example, a task + * failed, Spark launches a new task wth the same task id but different + * attempt number. Or a task is too slow, Spark launches new tasks wth the + * same task id but different attempt number, which means there are multiple + * tasks with the same task id running at the same time. Implementations can + * use this attempt number to distinguish writers of different task attempts. + * @param epochId A monotonically increasing id for streaming queries that are split in to + *discrete periods of execution. For non-streaming queries, + *this ID will always be 0. + */ + DataWriter createDataWriter(int partitionId, int attemptNumber, long epochId); + + @Override default DataWriter createDataWriter(int partitionId, int attemptNumber) { +throw new IllegalStateException("Streaming data writer factory cannot create data writers without epoch."); --- End diff -- Can you point me to the code where this would need to change? I don't see it here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20752#discussion_r172620679 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java --- @@ -27,6 +28,9 @@ * * Streaming queries are divided into intervals of data called epochs, with a monotonically * increasing numeric ID. This writer handles commits and aborts for each successive epoch. + * + * Note that StreamWriter implementations should provide instances of + * {@link StreamingDataWriterFactory}. --- End diff -- What do you think about removing the `SupportsWriteInternalRow` and always using `InternalRow`? For the read side, I think using `Row` and `UnsafeRow` is a problem: https://issues.apache.org/jira/browse/SPARK-23325 I don't see the value of using `Row` instead of `InternalRow` for readers, so maybe we should just simplify on both the read and write paths. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20752#discussion_r172616831 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -54,7 +55,14 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e override protected def doExecute(): RDD[InternalRow] = { val writeTask = writer match { case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() - case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) + case w: MicroBatchWriter => +new StreamingInternalRowDataWriterFactory(w.createWriterFactory(), query.schema) + case w: StreamWriter => +new StreamingInternalRowDataWriterFactory( + w.createWriterFactory().asInstanceOf[StreamingDataWriterFactory[Row]], --- End diff -- This will cause a cast exception, right? It think it is better to use a separate create method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20752#discussion_r172616601 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java --- @@ -27,6 +28,9 @@ * * Streaming queries are divided into intervals of data called epochs, with a monotonically * increasing numeric ID. This writer handles commits and aborts for each successive epoch. + * + * Note that StreamWriter implementations should provide instances of + * {@link StreamingDataWriterFactory}. --- End diff -- What about adding `createStreamWriterFactory` that returns the streaming interface? That would make it easier for implementations and prevent throwing cast exceptions because a `StreamingDataWriterFactory` is expected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20752#discussion_r172613993 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; + +@InterfaceStability.Evolving +public interface StreamingDataWriterFactory extends DataWriterFactory { + /** + * Returns a data writer to do the actual writing work. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + * + * @param partitionId A unique id of the RDD partition that the returned writer will process. + *Usually Spark processes many RDD partitions at the same time, + *implementations should use the partition id to distinguish writers for + *different partitions. + * @param attemptNumber Spark may launch multiple tasks with the same task id. For example, a task + * failed, Spark launches a new task wth the same task id but different + * attempt number. Or a task is too slow, Spark launches new tasks wth the + * same task id but different attempt number, which means there are multiple + * tasks with the same task id running at the same time. Implementations can + * use this attempt number to distinguish writers of different task attempts. + * @param epochId A monotonically increasing id for streaming queries that are split in to + *discrete periods of execution. For non-streaming queries, + *this ID will always be 0. + */ + DataWriter createDataWriter(int partitionId, int attemptNumber, long epochId); + + @Override default DataWriter createDataWriter(int partitionId, int attemptNumber) { +throw new IllegalStateException("Streaming data writer factory cannot create data writers without epoch."); --- End diff -- Why extend `DataWriterFactory` if this method is going to throw an exception? Why not make them independent interfaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20710 Epoch ID is not a valid part of the logical place in a query for batch. I think we should separate batch and streaming, as they are already coming from different interfaces. There's no need to pass useless information to a batch writer or committer. Implementations can choose to use the same logic if they want, but we should keep the API focused on what is needed, to keep it reasonable for implementers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20710 > Data source writers need to be able to reason about what progress they've made, which is impossible in the streaming case if each epoch is its own disconnected query. I don't think the writers necessarily need to reason about progress. Are you saying that there are guarantees the writers need to make, like ordering how data appears? I'm thinking of an implementation that creates a file for each task commit and the driver's commit operation makes those available. That doesn't require any progress tracking on tasks. As far as a writer knowing that different epochs are part of the same query: why? Is there something the writer needs to do? If so, then I think that is more of an argument for a separate streaming interface, or else batch implementations that ignore the epoch might do the wrong thing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20710 My question is: why can't we use a batch interface for batch and micro-batch (which behaves like batch) and add a separate streaming interface for continuous streaming? I see no reason to have epoch ID for batch, and it seems janky to add options that implementers should know to ignore. > Spark may need to distinguish between different segments of the data when talking to the remote sink. For which case, continuous or micro-batch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20710 Could the non-continuous streaming mode just use the batch interface, since each write is basically separate? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20710 @jose-torres, can you explain that more for me? Why would callers only use one interface but not the other? Wouldn't streaming use one and batch the other? Why would batch need to know about streaming and vice versa? The simplification is for implementers. It seems odd for implementations to deal with parameters that are for something else (e.g., don't worry about this for batch). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20710 @tdas, thanks for letting us know. I'm really wondering if we should be using the same interfaces between batch and streaming. The epoch id strikes me as strange for data sources that won't support streaming. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172324207 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- I think it is better to have a lazy val throw an exception if it is called at the wrong time (if it is private) than to add the cast because the exception at least validates that assumptions are met and can throw a reasonable error message. The cast might hide the problem, particularly over time as this code evolves. It would be reasonable to add another path that returns `Seq[DataReaderFactory[_]]` because that's the method contract, even though there is an assumption in the callers about how it will behave. As much of the contract between methods as possible should be expressed in types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172320487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- Why not separate the method into `batchReaderFactories` and `readerFactories`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r172319605 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -48,6 +48,9 @@ * same task id but different attempt number, which means there are multiple * tasks with the same task id running at the same time. Implementations can * use this attempt number to distinguish writers of different task attempts. + * @param epochId A monotonically increasing id for streaming queries that are split in to + *discrete periods of execution. For non-streaming queries, + *this ID will always be 0. */ - DataWriter createDataWriter(int partitionId, int attemptNumber); + DataWriter createDataWriter(int partitionId, int attemptNumber, long epochId); --- End diff -- Why are we using the same interface for streaming and batch here? Is there a compelling reason to do so instead of adding `StreamingWriterFactory`? Are the guarantees for an epoch identical to those of a single batch job? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20726: [SPARK-23574][CORE] Report SinglePartition in DataSource...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20726 Looks good to me other than a minor point on the private `readerFactories` val. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172317377 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- Why not separate the cases for columnar batch and unsafe rows? That would avoid needing to cast this later to `Seq[DataReaderFactory[UnsafeRow]]` and `Seq[DataReaderFactory[ColumnarBatch]]`, which isn't a very clean solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20692 Good point on nested types. I don't think heavy nesting is the usual case, but we can definitely improve the explain result in the long term by separating it out. Maybe just using a high-level type (e.g. struct<...>) for now would work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20692 My intent is just to advocate for clear feedback on the content of PRs. Good to hear your confidence in @mgaido91, and if he wants to work on a better explain, that's great too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20692 > I think you can add the patch to your fork. My primary concern isn't the feature, although I do think it is useful. My concern is how we work with contributors. My worry here is that the objections are not on the merits of this PR. Telling a contributor to either take on something much larger or come back in a few months is discouraging and stifles contributions. This patch is here now, and the focus should be on either why the idea is a not good one (i.e., *why* types should not go in plans) or what should be improved to fix the implementation. That could also be to propose a better alternative, but without an alternate proposal, asking for something different without constructive criticism of the PR leaves me, at least, simply confused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20692 > this is not urgent based on our release schedule. Marco is contributing this right now. It is a bad idea to ask contributors to show up in 4 months, if we don't have a better option by then. And in my experience, it's really hard to show up right before a release and get anything in because the focus is on the release. > I do not like adding the types into the plans that are already very complex. I see this as a critical piece of information that is missing from Spark plans. I've had to resort to a debugger several times to see what Spark thinks types are, and I think it is worth the extra size. I'm all for debating the value of adding these types, but we should focus on that question. Implementations can be fixed. And when have we ever avoided a contribution because it was too early? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20692 I don't think it is a good idea to block a fix like this on a redesign of explain plans. If that redesign was currently being worked on and this made it more difficult, it might be reasonable. But this can be a small interim fix and doesn't really affect larger plans. If this patch needs to be fixed to be more clean, then let's get that feedback to Marco. He has been really responsive on this contribution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20692 I don't see a compelling reason to block this work, which is well-defined and a reasonably small patch. What is here would help users with debugging. @gatorsmile, If you have an alternative approach, could you write it up and propose it on the dev list? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20692 @mgaido91, I'm happy without another option. Let's add it in the future if we find that we need it, but not assume that we will. This change should give most of the information needed to debug types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20692#discussion_r171626542 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -94,7 +94,7 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } - override def argString: String = { + override def argString(isLeaf: Boolean): String = { --- End diff -- What is the value of adding `isLeaf`? Don't leaf nodes already know they are? Why not just have leaf nodes call `outputStringWithTypes` or something and avoid all of the API changes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20692#discussion_r171624644 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala --- @@ -106,7 +106,7 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn override def toAttribute: Attribute = this def newInstance(): Attribute - + def stringWithType: String = s"$toString: ${dataType.simpleString}" --- End diff -- Nit: the `:` isn't needed and will accumulate to quite a few extra characters in plans. SQL uses `(id bigint, data string)` without `:` so I think we should mirror that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20692: [SPARK-23531][SQL] Show attribute type in explain
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20692 I agree with @cloud-fan that the type information doesn't need to be in every node of the plan, just in the scans and generators. We want enough information that we can tell what types are there, but we want to keep the output small if possible so there is a balance. I'm in favor of an option for extended types if it isn't too much trouble, but I don't think it is a very important feature. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20647 Looks like there's an unnecessary redact call in there and I found a couple of nits, but I think this is ready to go. +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20647#discussion_r171618852 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.util.Utils + +/** + * A trait that can be used by data source v2 related query plans(both logical and physical), to + * provide a string format of the data source information for explain. + */ +trait DataSourceV2StringFormat { + + /** + * The instance of this data source implementation. Note that we only consider its class in + * equals/hashCode, not the instance itself. + */ + def source: DataSourceV2 + + /** + * The output of the data source reader, w.r.t. column pruning. + */ + def output: Seq[Attribute] + + /** + * The options for this data source reader. + */ + def options: Map[String, String] + + /** + * The created data source reader. Here we use it to get the filters that has been pushed down + * so far, itself doesn't take part in the equals/hashCode. + */ + def reader: DataSourceReader + + private lazy val filters = reader match { +case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet +case s: SupportsPushDownFilters => s.pushedFilters().toSet +case _ => Set.empty + } + + private def sourceName: String = source match { +case registered: DataSourceRegister => registered.shortName() +case _ => source.getClass.getSimpleName.stripSuffix("$") + } + + def metadataString: String = { +val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)] + +if (filters.nonEmpty) { + entries += "Pushed Filters" -> filters.mkString("[", ", ", "]") --- End diff -- Nit: Does this need "Pushed"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org