[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464284#comment-16464284 ] Apache Spark commented on SPARK-23325: -- User 'rdblue' has created a pull request for this issue: https://github.com/apache/spark/pull/21237 > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16446313#comment-16446313 ] Apache Spark commented on SPARK-23325: -- User 'rdblue' has created a pull request for this issue: https://github.com/apache/spark/pull/21118 > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395752#comment-16395752 ] Ryan Blue commented on SPARK-23325: --- I created SPARK-23657 to track this in parallel. Feel free to comment on what needs to be done there. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395655#comment-16395655 ] Ryan Blue commented on SPARK-23325: --- I agree that the binary format is more work and probably out of scope – that's more reason to document `InternalRow`. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392048#comment-16392048 ] Wenchen Fan commented on SPARK-23325: - It's hard to stabilize the binary format like `UnsafeRow` and `UnsafeArrayData`. We can skip them as most data sources won't use them because they are hard to write. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392046#comment-16392046 ] Wenchen Fan commented on SPARK-23325: - I think it's mostly document work. We need to add document for all the data classes, like `UTF8String`, `CalendarInterval`, `ArrayData`, `MapData`, etc. Also we need to put the data definition of all the data types in `InternalRow`, like timestamp type is a long representing microseconds from Unix epoch. We should create an umbrella Jira and parallel these work. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391861#comment-16391861 ] Michael Armbrust commented on SPARK-23325: -- It does seem like it would be that hard to stabilize at least the generic form of InternalRow or am I missing something? > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391768#comment-16391768 ] Ryan Blue commented on SPARK-23325: --- By exposing an interface that uses UnsafeRow, don't we already have this problem? The only difference is that UnsafeRow is harder to produce. We also have a write interface in v2 that exposes InternalRow. I think now is the time to start documenting these so we can officially support InternalRow instead of effectively supporting InternalRow. UnsafeRow would benefit from more documentation, too. To find out how to use the write interfaces, I ended up using EXPLAIN CODEGEN on a bunch of different queries and looking at the results, then inspecting the writers to find out the in-memory representation. As for the columnar format, I see that as a nice-to-have. The v2 API is based on rows for a good reason, and we need to document and support that row format. Unless we are going to change v2 to a columnar API, stabilizing and documenting the columnar format doesn't help much. What work needs to be done here to make InternalRow viable? If it is to document the values used to internally represent different types, I can help out with that. I already have matching representations documented in the Iceberg spec anyway. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389135#comment-16389135 ] Reynold Xin commented on SPARK-23325: - Yes perhaps we should do that. It is a lot more work than what you guys think though, because as Wenchen said we need to properly define the semantics of all the data, similar to all of Hadoop IO (Text, etc) but more, because we have more data types. I'd probably prefer us defining the columnar format first, since if one is going after high performance, one'd probably prefer using that one... > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388520#comment-16388520 ] Wenchen Fan commented on SPARK-23325: - Making `InternalRow` stable is not only about stabilizing the interfaces, but also the semantics of data types and their data structure. e.g. timestamp type is microseconds from Unix epoch in Spark, string type is UTF8 encoded string via the `UTF8String` class, map type is a combination of 2 arrays, etc. cc [~rxin] and [~marmbrus] for broader discussions. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388411#comment-16388411 ] Ryan Blue commented on SPARK-23325: --- I agree that we should declare \{{InternalRow}} stable. It is effectively stable, as [~joseph.torres] argues. And by _far_ the easiest way to produce {{UnsafeRow}} is to produce {{InternalRow}} first and use Spark to convert to unsafe. If we're already relying on it there, we may as well have Spark handle the unsafe projection! > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388399#comment-16388399 ] Jose Torres commented on SPARK-23325: - How hard would it be to just declare that InternalRow is stable? The file has been touched about once per year for the past 3 years, and I doubt we'd be able to change it to any significant degree without risking serious regressions. >From my perspective, and I think (but correct me if I'm wrong) the perspective >of the SPIP, a stable interface which can match the performance of Spark's >internal data sources is one of the core goals of DataSourceV2. If >high-performance sources must implement InternalRow reads and writes, then >DataSourceV2 isn't stable until InternalRow is stable anyway. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388322#comment-16388322 ] Wenchen Fan commented on SPARK-23325: - The problem is that, `Row` is a stable class Spark promises it won't change over versions, `InternalRow` is not. I agree it's hard to output either `Row` or `UnsafeRow`, we should allow users to produce `InternalRow` directly. I missed this as I was only considering performance at that time. But I think we should keep the interface producing `Row` before we can make `InternalRow` stable. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351043#comment-16351043 ] Ryan Blue commented on SPARK-23325: --- [~cloud_fan], FYI. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org