[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls

2020-04-02 Thread Michael Armbrust (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073951#comment-17073951
 ] 

Michael Armbrust commented on SPARK-29358:
--

Sure, but it is very easy to make this not a behavior change.  Add an optional 
boolean parameter, {{allowMissingColumns}} (or something) that defaults to 
{{false}}.

> Make unionByName optionally fill missing columns with nulls
> ---
>
> Key: SPARK-29358
> URL: https://issues.apache.org/jira/browse/SPARK-29358
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Currently, unionByName requires two DataFrames to have the same set of 
> columns (even though the order can be different). It would be good to add 
> either an option to unionByName or a new type of union which fills in missing 
> columns with nulls. 
> {code:java}
> val df1 = Seq(1, 2, 3).toDF("x")
> val df2 = Seq("a", "b", "c").toDF("y")
> df1.unionByName(df2){code}
> This currently throws 
> {code:java}
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among 
> (y);
> {code}
> Ideally, there would be a way to make this return a DataFrame containing:
> {code:java}
> +++ 
> | x| y| 
> +++ 
> | 1|null| 
> | 2|null| 
> | 3|null| 
> |null| a| 
> |null| b| 
> |null| c| 
> +++
> {code}
> Currently the workaround to make this possible is by using unionByName, but 
> this is clunky:
> {code:java}
> df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null)))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls

2020-03-31 Thread Michael Armbrust (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071968#comment-17071968
 ] 

Michael Armbrust commented on SPARK-29358:
--

I think we should reconsider closing this as won't fix:
 - I think the semantics of this operation make sense. We already have this 
with writing JSON or parquet data. It is just a really inefficient way to 
accomplish the end goal.
 - I don't think it is a problem to move "away from SQL union". This is a 
clearly named, different operation. IMO this one makes *more* sense than SQL 
union. It is much more likely that columns with the same name are semantically 
equivalent than columns at the same ordinal with different names.
 - We are not breaking the behavior of unionByName. Currently it throws an 
exception in these cases. We are making more data transformations possible, but 
anything that was working before will continue to work. You could add a boolean 
flag if you were really concerned, but I think I would skip that.

> Make unionByName optionally fill missing columns with nulls
> ---
>
> Key: SPARK-29358
> URL: https://issues.apache.org/jira/browse/SPARK-29358
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Currently, unionByName requires two DataFrames to have the same set of 
> columns (even though the order can be different). It would be good to add 
> either an option to unionByName or a new type of union which fills in missing 
> columns with nulls. 
> {code:java}
> val df1 = Seq(1, 2, 3).toDF("x")
> val df2 = Seq("a", "b", "c").toDF("y")
> df1.unionByName(df2){code}
> This currently throws 
> {code:java}
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among 
> (y);
> {code}
> Ideally, there would be a way to make this return a DataFrame containing:
> {code:java}
> +++ 
> | x| y| 
> +++ 
> | 1|null| 
> | 2|null| 
> | 3|null| 
> |null| a| 
> |null| b| 
> |null| c| 
> +++
> {code}
> Currently the workaround to make this possible is by using unionByName, but 
> this is clunky:
> {code:java}
> df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null)))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31136) Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax

2020-03-12 Thread Michael Armbrust (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17058287#comment-17058287
 ] 

Michael Armbrust commented on SPARK-31136:
--

How hard would it be to add support for "LOAD DATA INPATH" rather than revert 
this? Is that the only workflow that breaks?

> Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax
> -
>
> Key: SPARK-31136
> URL: https://issues.apache.org/jira/browse/SPARK-31136
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Priority: Blocker
>
> We need to consider the behavior change of SPARK-30098 .
> This is a placeholder to keep the discussion and the final decision.
> `CREATE TABLE` syntax changes its behavior silently.
> The following is one example of the breaking the existing user data pipelines.
> *Apache Spark 2.4.5*
> {code}
> spark-sql> CREATE TABLE t(a STRING);
> Time taken: 3.061 seconds
> spark-sql> LOAD DATA INPATH '/usr/local/spark/README.md' INTO TABLE t;
> Time taken: 0.383 seconds
> spark-sql> SELECT * FROM t LIMIT 1;
> # Apache Spark
> Time taken: 2.05 seconds, Fetched 1 row(s)
> {code}
> *Apache Spark 3.0.0-preview2*
> {code}
> spark-sql> CREATE TABLE t(a STRING);
> Time taken: 3.969 seconds
> spark-sql> LOAD DATA INPATH '/usr/local/spark/README.md' INTO TABLE t;
> Error in query: LOAD DATA is not supported for datasource tables: 
> `default`.`t`;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31136) Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax

2020-03-12 Thread Michael Armbrust (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17058125#comment-17058125
 ] 

Michael Armbrust commented on SPARK-31136:
--

What was the default before, hive sequence files? That is pretty bad. They will 
get orders of magnitude better performance with parquet (I've seen users really 
burned by the performance of the old default here).

What operations are affected by this? What happens when run a program without 
changing anything?
 - For new tables, I assume you just get better performance?
 - Are there any operations where this breaks things? Can you "corrupt" a hive 
table by accidentally writing parquet data into it?

> Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax
> -
>
> Key: SPARK-31136
> URL: https://issues.apache.org/jira/browse/SPARK-31136
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Priority: Major
>
> We need to consider the behavior change of SPARK-30098 .
> This is a placeholder to keep the discussion and the final decision.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27911) PySpark Packages should automatically choose correct scala version

2019-07-16 Thread Michael Armbrust (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16886479#comment-16886479
 ] 

Michael Armbrust commented on SPARK-27911:
--

You are right, there is nothing pyspark specific about this. I just used 
pyspark as an example as I think they are more likely to use {{pip}} and thus 
never even see the scala version they are using.

It would be great if we could make this easier for all Spark users, preferably 
by automatically correcting mismatches.

> PySpark Packages should automatically choose correct scala version
> --
>
> Key: SPARK-27911
> URL: https://issues.apache.org/jira/browse/SPARK-27911
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Michael Armbrust
>Priority: Major
>
> Today, users of pyspark (and Scala) need to manually specify the version of 
> Scala that their Spark installation is using when adding a Spark package to 
> their application. This extra configuration is confusing to users who may not 
> even know which version of Scala they are using (for example, if they 
> installed Spark using {{pip}}). The confusion here is exacerbated by releases 
> in Spark that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.
> https://spark.apache.org/releases/spark-release-2-4-2.html
> https://spark.apache.org/releases/spark-release-2-4-3.html
> Since Spark can know which version of Scala it was compiled for, we should 
> give users the option to automatically choose the correct version.  This 
> could be as simple as a substitution for {{$scalaVersion}} or something when 
> resolving a package (similar to SBTs support for automatically handling scala 
> dependencies).
> Here are some concrete examples of users getting it wrong and getting 
> confused:
> https://github.com/delta-io/delta/issues/6
> https://github.com/delta-io/delta/issues/63



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27911) PySpark Packages should automatically choose correct scala version

2019-05-31 Thread Michael Armbrust (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-27911:
-
Description: 
Today, users of pyspark (and Scala) need to manually specify the version of 
Scala that their Spark installation is using when adding a Spark package to 
their application. This extra configuration is confusing to users who may not 
even know which version of Scala they are using (for example, if they installed 
Spark using {{pip}}). The confusion here is exacerbated by releases in Spark 
that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.

https://spark.apache.org/releases/spark-release-2-4-2.html
https://spark.apache.org/releases/spark-release-2-4-3.html

Since Spark can know which version of Scala it was compiled for, we should give 
users the option to automatically choose the correct version.  This could be as 
simple as a substitution for {{$scalaVersion}} or something when resolving a 
package (similar to SBTs support for automatically handling scala dependencies).

Here are some concrete examples of users getting it wrong and getting confused:
https://github.com/delta-io/delta/issues/6
https://github.com/delta-io/delta/issues/63

  was:
Today, users of pyspark (and Scala) need to manually specify the version of 
Scala that their Spark installation is using when adding a Spark package to 
their application. This extra configuration confusing to users who may not even 
know which version of Scala they are using (for example, if they installed 
Spark using {{pip}}). The confusion here is exacerbated by releases in Spark 
that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.

https://spark.apache.org/releases/spark-release-2-4-2.html
https://spark.apache.org/releases/spark-release-2-4-3.html

Since Spark can know which version of Scala it was compiled for, we should give 
users the option to automatically choose the correct version.  This could be as 
simple as a substitution for {{$scalaVersion}} or something when resolving a 
package (similar to SBTs support for automatically handling scala dependencies).

Here are some concrete examples of users getting it wrong and getting confused:
https://github.com/delta-io/delta/issues/6
https://github.com/delta-io/delta/issues/63


> PySpark Packages should automatically choose correct scala version
> --
>
> Key: SPARK-27911
> URL: https://issues.apache.org/jira/browse/SPARK-27911
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Michael Armbrust
>Priority: Major
>
> Today, users of pyspark (and Scala) need to manually specify the version of 
> Scala that their Spark installation is using when adding a Spark package to 
> their application. This extra configuration is confusing to users who may not 
> even know which version of Scala they are using (for example, if they 
> installed Spark using {{pip}}). The confusion here is exacerbated by releases 
> in Spark that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.
> https://spark.apache.org/releases/spark-release-2-4-2.html
> https://spark.apache.org/releases/spark-release-2-4-3.html
> Since Spark can know which version of Scala it was compiled for, we should 
> give users the option to automatically choose the correct version.  This 
> could be as simple as a substitution for {{$scalaVersion}} or something when 
> resolving a package (similar to SBTs support for automatically handling scala 
> dependencies).
> Here are some concrete examples of users getting it wrong and getting 
> confused:
> https://github.com/delta-io/delta/issues/6
> https://github.com/delta-io/delta/issues/63



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27911) PySpark Packages should automatically choose correct scala version

2019-05-31 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-27911:


 Summary: PySpark Packages should automatically choose correct 
scala version
 Key: SPARK-27911
 URL: https://issues.apache.org/jira/browse/SPARK-27911
 Project: Spark
  Issue Type: New Feature
  Components: PySpark
Affects Versions: 2.4.3
Reporter: Michael Armbrust


Today, users of pyspark (and Scala) need to manually specify the version of 
Scala that their Spark installation is using when adding a Spark package to 
their application. This extra configuration confusing to users who may not even 
know which version of Scala they are using (for example, if they installed 
Spark using {{pip}}). The confusion here is exacerbated by releases in Spark 
that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.

https://spark.apache.org/releases/spark-release-2-4-2.html
https://spark.apache.org/releases/spark-release-2-4-3.html

Since Spark can know which version of Scala it was compiled for, we should give 
users the option to automatically choose the correct version.  This could be as 
simple as a substitution for {{$scalaVersion}} or something when resolving a 
package (similar to SBTs support for automatically handling scala dependencies).

Here are some concrete examples of users getting it wrong and getting confused:
https://github.com/delta-io/delta/issues/6
https://github.com/delta-io/delta/issues/63



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27676) InMemoryFileIndex should hard-fail on missing files instead of logging and continuing

2019-05-10 Thread Michael Armbrust (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837552#comment-16837552
 ] 

Michael Armbrust commented on SPARK-27676:
--

I tend to agree that all cases where we chose to ignore missing files should be 
hidden behind the existing {{spark.sql.files.ignoreMissingFiles}} flag.

> InMemoryFileIndex should hard-fail on missing files instead of logging and 
> continuing
> -
>
> Key: SPARK-27676
> URL: https://issues.apache.org/jira/browse/SPARK-27676
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Josh Rosen
>Priority: Major
>
> Spark's {{InMemoryFileIndex}} contains two places where {{FileNotFound}} 
> exceptions are caught and logged as warnings (during [directory 
> listing|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274]
>  and [block location 
> lookup|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L333]).
>  I think that this is a dangerous default behavior and would prefer that 
> Spark hard-fails by default (with the ignore-and-continue behavior guarded by 
> a SQL session configuration).
> In SPARK-17599 and SPARK-24364, logic was added to ignore missing files. 
> Quoting from the PR for SPARK-17599:
> {quote}The {{ListingFileCatalog}} lists files given a set of resolved paths. 
> If a folder is deleted at any time between the paths were resolved and the 
> file catalog can check for the folder, the Spark job fails. This may abruptly 
> stop long running StructuredStreaming jobs for example.
> Folders may be deleted by users or automatically by retention policies. These 
> cases should not prevent jobs from successfully completing.
> {quote}
> Let's say that I'm *not* expecting to ever delete input files for my job. In 
> that case, this behavior can mask bugs.
> One straightforward masked bug class is accidental file deletion: if I'm 
> never expecting to delete files then I'd prefer to fail my job if Spark sees 
> deleted files.
> A more subtle bug can occur when using a S3 filesystem. Say I'm running a 
> Spark job against a partitioned Parquet dataset which is laid out like this:
> {code:java}
> data/
>   date=1/
> region=west/
>0.parquet
>1.parquet
> region=east/
>0.parquet
>1.parquet{code}
> If I do {{spark.read.parquet("/data/date=1/")}} then Spark needs to perform 
> multiple rounds of file listing, first listing {{/data/date=1}} to discover 
> the partitions for that date, then listing within each partition to discover 
> the leaf files. Due to the eventual consistency of S3 ListObjects, it's 
> possible that the first listing will show the {{region=west}} and 
> {{region=east}} partitions existing and then the next-level listing fails to 
> return any for some of the directories (e.g. {{/data/date=1/}} returns files 
> but {{/data/date=1/region=west/}} throws a {{FileNotFoundException}} in S3A 
> due to ListObjects inconsistency).
> If Spark propagated the {{FileNotFoundException}} and hard-failed in this 
> case then I'd be able to fail the job in this case where we _definitely_ know 
> that the S3 listing is inconsistent (failing here doesn't guard against _all_ 
> potential S3 list inconsistency issues (e.g. back-to-back listings which both 
> return a subset of the true set of objects), but I think it's still an 
> improvement to fail for the subset of cases that we _can_ detect even if 
> that's not a surefire failsafe against the more general problem).
> Finally, I'm unsure if the original patch will have the desired effect: if a 
> file is deleted once a Spark job expects to read it then that can cause 
> problems at multiple layers, both in the driver (multiple rounds of file 
> listing) and in executors (if the deletion occurs after the construction of 
> the catalog but before the scheduling of the read tasks); I think the 
> original patch only resolved the problem for the driver (unless I'm missing 
> similar executor-side code specific to the original streaming use-case).
> Given all of these reasons, I think that the "ignore potentially deleted 
> files during file index listing" behavior should be guarded behind a feature 
> flag which defaults to {{false}}, consistent with the existing 
> {{spark.files.ignoreMissingFiles}} and {{spark.sql.files.ignoreMissingFiles}} 
> flags (which both default to false).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: 

[jira] [Assigned] (SPARK-27453) DataFrameWriter.partitionBy is Silently Dropped by DSV1

2019-04-12 Thread Michael Armbrust (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reassigned SPARK-27453:


Assignee: Liwen Sun

> DataFrameWriter.partitionBy is Silently Dropped by DSV1
> ---
>
> Key: SPARK-27453
> URL: https://issues.apache.org/jira/browse/SPARK-27453
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.1, 1.5.2, 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.4.1
>Reporter: Michael Armbrust
>Assignee: Liwen Sun
>Priority: Critical
>
> This is a long standing quirk of the interaction between {{DataFrameWriter}} 
> and {{CreatableRelationProvider}} (and the other forms of the DSV1 API).  
> Users can specify columns in {{partitionBy}} and our internal data sources 
> will use this information.  Unfortunately, for external systems, this data is 
> silently dropped with no feedback given to the user.
> In the long run, I think that DataSourceV2 is a better answer. However, I 
> don't think we should wait for that API to stabilize before offering some 
> kind of solution to developers of external data sources. I also do not think 
> we should break binary compatibility of this API, but I do think that  small 
> surgical fix could alleviate the issue.
> I would propose that we could propagate partitioning information (when 
> present) along with the other configuration options passed to the data source 
> in the {{String, String}} map.
> I think its very unlikely that there are both data sources that validate 
> extra options and users who are using (no-op) partitioning with them, but out 
> of an abundance of caution we should protect the behavior change behind a 
> {{legacy}} flag that can be turned off.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27453) DataFrameWriter.partitionBy is Silently Dropped by DSV1

2019-04-12 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-27453:


 Summary: DataFrameWriter.partitionBy is Silently Dropped by DSV1
 Key: SPARK-27453
 URL: https://issues.apache.org/jira/browse/SPARK-27453
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.1, 2.2.3, 2.1.3, 2.0.2, 1.6.3, 1.5.2, 1.4.1
Reporter: Michael Armbrust


This is a long standing quirk of the interaction between {{DataFrameWriter}} 
and {{CreatableRelationProvider}} (and the other forms of the DSV1 API).  Users 
can specify columns in {{partitionBy}} and our internal data sources will use 
this information.  Unfortunately, for external systems, this data is silently 
dropped with no feedback given to the user.

In the long run, I think that DataSourceV2 is a better answer. However, I don't 
think we should wait for that API to stabilize before offering some kind of 
solution to developers of external data sources. I also do not think we should 
break binary compatibility of this API, but I do think that  small surgical fix 
could alleviate the issue.

I would propose that we could propagate partitioning information (when present) 
along with the other configuration options passed to the data source in the 
{{String, String}} map.

I think its very unlikely that there are both data sources that validate extra 
options and users who are using (no-op) partitioning with them, but out of an 
abundance of caution we should protect the behavior change behind a {{legacy}} 
flag that can be turned off.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23831) Add org.apache.derby to IsolatedClientLoader

2018-11-13 Thread Michael Armbrust (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685629#comment-16685629
 ] 

Michael Armbrust commented on SPARK-23831:
--

Why was it reverted?

> Add org.apache.derby to IsolatedClientLoader
> 
>
> Key: SPARK-23831
> URL: https://issues.apache.org/jira/browse/SPARK-23831
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Yuming Wang
>Priority: Major
>
> Add org.apache.derby to IsolatedClientLoader,otherwise it may throw an 
> exception:
> {noformat}
> [info] Cause: java.sql.SQLException: Failed to start database 'metastore_db' 
> with class loader 
> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@2439ab23, see 
> the next exception for details.
> [info] at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
> [info] at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
> [info] at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
> [info] at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown 
> Source)
> [info] at org.apache.derby.impl.jdbc.EmbedConnection.(Unknown Source)
> [info] at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source)
> {noformat}
> How to reproduce:
> {noformat}
> sed 's/HiveExternalCatalogSuite/HiveExternalCatalog2Suite/g' 
> sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
>  > 
> sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalog2Suite.scala
> build/sbt -Phive "hive/test-only *.HiveExternalCatalogSuite 
> *.HiveExternalCatalog2Suite"
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6459) Warn when Column API is constructing trivially true equality

2018-07-23 Thread Michael Armbrust (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-6459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553330#comment-16553330
 ] 

Michael Armbrust commented on SPARK-6459:
-

[~tenstriker] this will never happen from a SQL query.  This only happens when 
you take already resolved attributes from different parts of a DataFrame and 
manually construct an equality that can't be differentiated.

> Warn when Column API is constructing trivially true equality
> 
>
> Key: SPARK-6459
> URL: https://issues.apache.org/jira/browse/SPARK-6459
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.3.0
>Reporter: Michael Armbrust
>Assignee: Michael Armbrust
>Priority: Critical
> Fix For: 1.3.1, 1.4.0
>
>
> Right now its pretty confusing when a user constructs and equality predicate 
> that is going to be use in a self join, where the optimizer cannot 
> distinguish between the attributes in question (e.g.,  [SPARK-6231]).  Since 
> there is really no good reason to do this, lets print a warning.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-5517) Add input types for Java UDFs

2018-05-09 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-5517.
-
Resolution: Unresolved

> Add input types for Java UDFs
> -
>
> Key: SPARK-5517
> URL: https://issues.apache.org/jira/browse/SPARK-5517
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.3.0
>Reporter: Michael Armbrust
>Priority: Critical
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18165) Kinesis support in Structured Streaming

2018-05-07 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466410#comment-16466410
 ] 

Michael Armbrust commented on SPARK-18165:
--

This is great!  I'm glad there are more connectors for Structured Streaming!

A few high-level thoughts:
 - The current Source/Sink APIs are internal/unstable.  We are working on 
building public/stable APIs as part of DataSourceV2. Would be great to get 
feedback on those APIs if this is ported to them
 - In general as the Spark project scales, we are trying to move more of the 
connectors out of the core project.  I'd suggest looking at contributing this 
to Apache Bahir and/or Spark Packages.

> Kinesis support in Structured Streaming
> ---
>
> Key: SPARK-18165
> URL: https://issues.apache.org/jira/browse/SPARK-18165
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Lauren Moos
>Priority: Major
>
> Implement Kinesis based sources and sinks for Structured Streaming



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18165) Kinesis support in Structured Streaming

2018-05-07 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18165:
-
Component/s: (was: DStreams)
 Structured Streaming

> Kinesis support in Structured Streaming
> ---
>
> Key: SPARK-18165
> URL: https://issues.apache.org/jira/browse/SPARK-18165
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Lauren Moos
>Priority: Major
>
> Implement Kinesis based sources and sinks for Structured Streaming



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-04-10 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16433222#comment-16433222
 ] 

Michael Armbrust commented on SPARK-23337:
--

The checkpoint will only grow if you are doing an aggregation, otherwise the 
watermark will not affect computation.

You can set a watermark on the nested column, you just need to project it to a 
top level column using {{withColumn}}

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>  at 
> 

[jira] [Commented] (SPARK-23835) When Dataset.as converts column from nullable to non-nullable type, null Doubles are converted silently to -1

2018-04-02 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423046#comment-16423046
 ] 

Michael Armbrust commented on SPARK-23835:
--

I believe the correct semantics are to throw a {{NullPointerException}} if the 
users tries to deserialize a null value into a type that doesn't support nulls. 
 See [this test case for an 
example|https://github.com/apache/spark/blob/b2edc30db1dcc6102687d20c158a2700965fdf51/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala#L1428-L1443].
 We must just be missing the assertion in this case.

> When Dataset.as converts column from nullable to non-nullable type, null 
> Doubles are converted silently to -1
> -
>
> Key: SPARK-23835
> URL: https://issues.apache.org/jira/browse/SPARK-23835
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> I constructed a DataFrame with a nullable java.lang.Double column (and an 
> extra Double column).  I then converted it to a Dataset using ```as[(Double, 
> Double)]```.  When the Dataset is shown, it has a null.  When it is collected 
> and printed, the null is silently converted to a -1.
> Code snippet to reproduce this:
> {code}
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(java.lang.Double, Double)](
>   (1.0, 2.0),
>   (3.0, 4.0),
>   (Double.NaN, 5.0),
>   (null, 6.0)
> ).toDF("a", "b")
> df.show()  // OUTPUT 1: has null
> df.printSchema()
> val data = df.as[(Double, Double)]
> data.show()  // OUTPUT 2: has null
> data.collect().foreach(println)  // OUTPUT 3: has -1
> {code}
> OUTPUT 1 and 2:
> {code}
> ++---+
> |   a|  b|
> ++---+
> | 1.0|2.0|
> | 3.0|4.0|
> | NaN|5.0|
> |null|6.0|
> ++---+
> {code}
> OUTPUT 3:
> {code}
> (1.0,2.0)
> (3.0,4.0)
> (NaN,5.0)
> (-1.0,6.0)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23835) When Dataset.as converts column from nullable to non-nullable type, null Doubles are converted silently to -1

2018-03-30 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420809#comment-16420809
 ] 

Michael Armbrust commented on SPARK-23835:
--

/cc [~cloud_fan]

> When Dataset.as converts column from nullable to non-nullable type, null 
> Doubles are converted silently to -1
> -
>
> Key: SPARK-23835
> URL: https://issues.apache.org/jira/browse/SPARK-23835
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> I constructed a DataFrame with a nullable java.lang.Double column (and an 
> extra Double column).  I then converted it to a Dataset using ```as[(Double, 
> Double)]```.  When the Dataset is shown, it has a null.  When it is collected 
> and printed, the null is silently converted to a -1.
> Code snippet to reproduce this:
> {code}
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(java.lang.Double, Double)](
>   (1.0, 2.0),
>   (3.0, 4.0),
>   (Double.NaN, 5.0),
>   (null, 6.0)
> ).toDF("a", "b")
> df.show()  // OUTPUT 1: has null
> df.printSchema()
> val data = df.as[(Double, Double)]
> data.show()  // OUTPUT 2: has null
> data.collect().foreach(println)  // OUTPUT 3: has -1
> {code}
> OUTPUT 1 and 2:
> {code}
> ++---+
> |   a|  b|
> ++---+
> | 1.0|2.0|
> | 3.0|4.0|
> | NaN|5.0|
> |null|6.0|
> ++---+
> {code}
> OUTPUT 3:
> {code}
> (1.0,2.0)
> (3.0,4.0)
> (NaN,5.0)
> (-1.0,6.0)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.

2018-03-08 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391861#comment-16391861
 ] 

Michael Armbrust commented on SPARK-23325:
--

It does seem like it would be that hard to stabilize at least the generic form 
of InternalRow or am I missing something?

> DataSourceV2 readers should always produce InternalRow.
> ---
>
> Key: SPARK-23325
> URL: https://issues.apache.org/jira/browse/SPARK-23325
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ryan Blue
>Priority: Major
>
> DataSourceV2 row-oriented implementations are limited to producing either 
> {{Row}} instances or {{UnsafeRow}} instances by implementing 
> {{SupportsScanUnsafeRow}}. Instead, I think that implementations should 
> always produce {{InternalRow}}.
> The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither 
> one is appropriate for implementers.
> File formats don't produce {{Row}} instances or the data values used by 
> {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation 
> that uses {{Row}} instances must produce data that is immediately translated 
> from the representation that was just produced by Spark. In my experience, it 
> made little sense to translate a timestamp in microseconds to a 
> (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass 
> that instance to Spark for immediate translation back.
> On the other hand, {{UnsafeRow}} is very difficult to produce unless data is 
> already held in memory. Even the Parquet support built into Spark 
> deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce 
> unsafe rows. When I went to build an implementation that deserializes Parquet 
> or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be 
> done without first deserializing into memory because the size of an array 
> must be known before any values are written.
> I ended up deciding to deserialize to {{InternalRow}} and use 
> {{UnsafeProjection}} to convert to unsafe. There are two problems with this: 
> first, this is Scala and was difficult to call from Java (it required 
> reflection), and second, this causes double projection in the physical plan 
> (a copy for unsafe to unsafe) if there is a projection that wasn't fully 
> pushed to the data source.
> I think the solution is to have a single interface for readers that expects 
> {{InternalRow}}. Then, a projection should be added in the Spark plan to 
> convert to unsafe and avoid projection in the plan and in the data source. If 
> the data source already produces unsafe rows by deserializing directly, this 
> still minimizes the number of copies because the unsafe projection will check 
> whether the incoming data is already {{UnsafeRow}}.
> Using {{InternalRow}} would also match the interface on the write side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-22 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373611#comment-16373611
 ] 

Michael Armbrust commented on SPARK-18057:
--

My only concern is that it is stable and backwards compatible.  I'm fine with 
skipping / waiting.

Regarding the timeline, we can put this in master whenever, but we typically 
don't change dependencies in point releases so this will need to be targeted at 
Spark 2.4.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-21 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372138#comment-16372138
 ] 

Michael Armbrust commented on SPARK-18057:
--

We generally tend towards "don't break things that are working for people" 
rather than "clean".  See the RDD API for an example :).

I'm increasingly pro just keeping the name and upgrading the client.  If they 
ever break compatibility again we can have yet another artifact name, but I 
hope it doesn't come to that.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-20 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370780#comment-16370780
 ] 

Michael Armbrust commented on SPARK-18057:
--

+1 to upgrading and it would also be great to add support for any new features 
(i.e. starting a query based on the time index rather than a specific offset).

I personally don't think that fixing KAFKA-4897 is mandatory, but keeping our 
stress tests running without hanging or losing coverage is.

Regarding naming, I'd probably just stop changing the name and say that 
"kafka-0-10-sql" works with any broker that is 0.10.0+.  We could also get rid 
of it, but that seems like an unnecessary change to me that just causes 
unnecessary pain to existing users.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-02-16 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16367933#comment-16367933
 ] 

Michael Armbrust commented on SPARK-23337:
--

This is essentially the same issue as SPARK-18084. We are taking a column name 
here, not an expression.  As such you can only reference top level columns.  I 
agree this is an annoying aspect of the API, but changing it might have to 
happen at a major release since it would be change in behavior.

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>  at 
> 

[jira] [Updated] (SPARK-23173) from_json can produce nulls for fields which are marked as non-nullable

2018-02-15 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-23173:
-
Labels: release-notes  (was: )

> from_json can produce nulls for fields which are marked as non-nullable
> ---
>
> Key: SPARK-23173
> URL: https://issues.apache.org/jira/browse/SPARK-23173
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Herman van Hovell
>Priority: Major
>  Labels: release-notes
>
> The {{from_json}} function uses a schema to convert a string into a Spark SQL 
> struct. This schema can contain non-nullable fields. The underlying 
> {{JsonToStructs}} expression does not check if a resulting struct respects 
> the nullability of the schema. This leads to very weird problems in consuming 
> expressions. In our case parquet writing would produce an illegal parquet 
> file.
> There are roughly solutions here:
>  # Assume that each field in schema passed to {{from_json}} is nullable, and 
> ignore the nullability information set in the passed schema.
>  # Validate the object during runtime, and fail execution if the data is null 
> where we are not expecting this.
> I currently am slightly in favor of option 1, since this is the more 
> performant option and a lot easier to do.
> WDYT? cc [~rxin] [~marmbrus] [~hyukjin.kwon] [~brkyvz]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2018-01-18 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331141#comment-16331141
 ] 

Michael Armbrust commented on SPARK-20928:
--

There is more work to do so I might leave the umbrella open, but we are going 
to have an initial version that supports reading and writing from/to Kafka with 
very low latency in Spark 2.3!  Stay tuned for updates to docs and blog posts.

> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>Priority: Major
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.

2018-01-12 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323883#comment-16323883
 ] 

Michael Armbrust commented on SPARK-23050:
--

[~zsxwing] is correct.  While it is possible for files to get written out more 
than once, the whole point of the log is to make sure that readers will only 
see exactly one copy. It is possible we will come up with a new streaming 
commit protocol as part of the migration to the V2 data source, but for now I 
would call this working as intended.

> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -
>
> Key: SPARK-23050
> URL: https://issues.apache.org/jira/browse/SPARK-23050
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yash Sharma
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
> .format("parquet") \
> .option("compression", "snappy") \
> .option("path", "s3://path/data/") \
> .option("checkpointLocation", "s3://path/checkpoint/") \
> .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00  17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10  17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:>(277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 

[jira] [Commented] (SPARK-22947) SPIP: as-of join in Spark SQL

2018-01-03 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16310423#comment-16310423
 ] 

Michael Armbrust commented on SPARK-22947:
--

+1 to [~rxin]'s question.  This seems like it might just be sugar on top of 
[SPARK-8682].

> SPIP: as-of join in Spark SQL
> -
>
> Key: SPARK-22947
> URL: https://issues.apache.org/jira/browse/SPARK-22947
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Li Jin
> Attachments: SPIP_ as-of join in Spark SQL (1).pdf
>
>
> h2. Background and Motivation
> Time series analysis is one of the most common analysis on financial data. In 
> time series analysis, as-of join is a very common operation. Supporting as-of 
> join in Spark SQL will allow many use cases of using Spark SQL for time 
> series analysis.
> As-of join is “join on time” with inexact time matching criteria. Various 
> library has implemented asof join or similar functionality:
> Kdb: https://code.kx.com/wiki/Reference/aj
> Pandas: 
> http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof
> R: This functionality is called “Last Observation Carried Forward”
> https://www.rdocumentation.org/packages/zoo/versions/1.8-0/topics/na.locf
> JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin
> Flint: https://github.com/twosigma/flint#temporal-join-functions
> This proposal advocates introducing new API in Spark SQL to support as-of 
> join.
> h2. Target Personas
> Data scientists, data engineers
> h2. Goals
> * New API in Spark SQL that allows as-of join
> * As-of join of multiple table (>2) should be performant, because it’s very 
> common that users need to join multiple data sources together for further 
> analysis.
> * Define Distribution, Partitioning and shuffle strategy for ordered time 
> series data
> h2. Non-Goals
> These are out of scope for the existing SPIP, should be considered in future 
> SPIP as improvement to Spark’s time series analysis ability:
> * Utilize partition information from data source, i.e, begin/end of each 
> partition to reduce sorting/shuffling
> * Define API for user to implement asof join time spec in business calendar 
> (i.e. lookback one business day, this is very common in financial data 
> analysis because of market calendars)
> * Support broadcast join
> h2. Proposed API Changes
> h3. TimeContext
> TimeContext is an object that defines the time scope of the analysis, it has 
> begin time (inclusive) and end time (exclusive). User should be able to 
> change the time scope of the analysis (i.e, from one month to five year) by 
> just changing the TimeContext. 
> To Spark engine, TimeContext is a hint that:
> can be used to repartition data for join
> serve as a predicate that can be pushed down to storage layer
> Time context is similar to filtering time by begin/end, the main difference 
> is that time context can be expanded based on the operation taken (see 
> example in as-of join).
> Time context example:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> {code}
> h3. asofJoin
> h4. User Case A (join without key)
> Join two DataFrames on time, with one day lookback:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, quantity
> 20160101, 100
> 20160102, 50
> 20160104, -50
> 20160105, 100
> dfB:
> time, price
> 20151231, 100.0
> 20160104, 105.0
> 20160105, 102.0
> output:
> time, quantity, price
> 20160101, 100, 100.0
> 20160102, 50, null
> 20160104, -50, 105.0
> 20160105, 100, 102.0
> {code}
> Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This 
> is an important illustration of the time context - it is able to expand the 
> context to 20151231 on dfB because of the 1 day lookback.
> h4. Use Case B (join with key)
> To join on time and another key (for instance, id), we use “by” to specify 
> the key.
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = 
> JoinSpec(timeContext).on("time").by("id").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, id, quantity
> 20160101, 1, 100
> 20160101, 2, 50
> 20160102, 1, -50
> 20160102, 2, 50
> dfB:
> time, id, price
> 20151231, 1, 100.0
> 20150102, 1, 105.0
> 20150102, 2, 195.0
> Output:
> time, id, quantity, price
> 20160101, 1, 100, 100.0
> 20160101, 2, 50, null
> 20160102, 1, -50, 105.0
> 20160102, 2, 50, 195.0
> {code}
> h2. Optional Design Sketch
> h3. Implementation A
> (This is just initial thought of 

[jira] [Commented] (SPARK-22929) Short name for "kafka" doesn't work in pyspark with packages

2017-12-31 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307144#comment-16307144
 ] 

Michael Armbrust commented on SPARK-22929:
--

Haha, thanks [~sowen], you are right. Kafka is a hard word I guess :)

> Short name for "kafka" doesn't work in pyspark with packages
> 
>
> Key: SPARK-22929
> URL: https://issues.apache.org/jira/browse/SPARK-22929
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>Priority: Critical
>
> When I start pyspark using the following command:
> {code}
> bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
> {code}
> The following throws an error:
> {code}
> spark.read.format("kakfa")...
> py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
> : java.lang.ClassNotFoundException: Failed to find data source: kakfa. Please 
> find packages at http://spark.apache.org/third-party-projects.html
> {code}
> The following does work:
> {code}
> spark.read.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22929) Short name for "kafka" doesn't work in pyspark with packages

2017-12-30 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-22929:


 Summary: Short name for "kafka" doesn't work in pyspark with 
packages
 Key: SPARK-22929
 URL: https://issues.apache.org/jira/browse/SPARK-22929
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.2.0
Reporter: Michael Armbrust
Priority: Critical


When I start pyspark using the following command:
{code}
bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
{code}

The following throws an error:
{code}
spark.read.format("kakfa")...

py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
: java.lang.ClassNotFoundException: Failed to find data source: kakfa. Please 
find packages at http://spark.apache.org/third-party-projects.html
{code}

The following does work:
{code}
spark.read.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")...
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22862) Docs on lazy elimination of columns missing from an encoder.

2017-12-21 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-22862:


 Summary: Docs on lazy elimination of columns missing from an 
encoder.
 Key: SPARK-22862
 URL: https://issues.apache.org/jira/browse/SPARK-22862
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.1
Reporter: Michael Armbrust
Assignee: Michael Armbrust






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22739) Additional Expression Support for Objects

2017-12-20 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299232#comment-16299232
 ] 

Michael Armbrust commented on SPARK-22739:
--

Sounds good to me.  I'm happy to provide pointers on the PR, just "@marmbrus".

> Additional Expression Support for Objects
> -
>
> Key: SPARK-22739
> URL: https://issues.apache.org/jira/browse/SPARK-22739
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Aleksander Eskilson
>
> Some discussion in Spark-Avro [1] motivates additions and minor changes to 
> the {{Objects}} Expressions API [2]. The proposed changes include
> * a generalized form of {{initializeJavaBean}} taking a sequence of 
> initialization expressions that can be applied to instances of varying objects
> * an object cast that performs a simple Java type cast against a value
> * making {{ExternalMapToCatalyst}} public, for use in outside libraries
> These changes would facilitate the writing of custom encoders for varying 
> objects that cannot already be readily converted to a statically typed 
> dataset by a JavaBean encoder (e.g. Avro).
> [1] -- 
> https://github.com/databricks/spark-avro/pull/217#issuecomment-342599110
> [2] --
>  
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22739) Additional Expression Support for Objects

2017-12-20 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299167#comment-16299167
 ] 

Michael Armbrust commented on SPARK-22739:
--

Any progress on this?  Branch cut is January 1st, and I'd love to include this.

> Additional Expression Support for Objects
> -
>
> Key: SPARK-22739
> URL: https://issues.apache.org/jira/browse/SPARK-22739
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Aleksander Eskilson
>
> Some discussion in Spark-Avro [1] motivates additions and minor changes to 
> the {{Objects}} Expressions API [2]. The proposed changes include
> * a generalized form of {{initializeJavaBean}} taking a sequence of 
> initialization expressions that can be applied to instances of varying objects
> * an object cast that performs a simple Java type cast against a value
> * making {{ExternalMapToCatalyst}} public, for use in outside libraries
> These changes would facilitate the writing of custom encoders for varying 
> objects that cannot already be readily converted to a statically typed 
> dataset by a JavaBean encoder (e.g. Avro).
> [1] -- 
> https://github.com/databricks/spark-avro/pull/217#issuecomment-342599110
> [2] --
>  
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22739) Additional Expression Support for Objects

2017-12-20 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-22739:
-
Target Version/s: 2.3.0

> Additional Expression Support for Objects
> -
>
> Key: SPARK-22739
> URL: https://issues.apache.org/jira/browse/SPARK-22739
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Aleksander Eskilson
>
> Some discussion in Spark-Avro [1] motivates additions and minor changes to 
> the {{Objects}} Expressions API [2]. The proposed changes include
> * a generalized form of {{initializeJavaBean}} taking a sequence of 
> initialization expressions that can be applied to instances of varying objects
> * an object cast that performs a simple Java type cast against a value
> * making {{ExternalMapToCatalyst}} public, for use in outside libraries
> These changes would facilitate the writing of custom encoders for varying 
> objects that cannot already be readily converted to a statically typed 
> dataset by a JavaBean encoder (e.g. Avro).
> [1] -- 
> https://github.com/databricks/spark-avro/pull/217#issuecomment-342599110
> [2] --
>  
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22824) Spark Structured Streaming Source trait breaking change

2017-12-18 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295676#comment-16295676
 ] 

Michael Armbrust commented on SPARK-22824:
--

This is technically an internal API (as is all of 
[org.apache.spark.sql.execution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala]).
  That said, I think we should do our best to preserve compatibility until 
there is a {{@Stable}} public alternative (which is the goal of the V2 project).

We'll open a PR to try and fix this.

> Spark Structured Streaming Source trait breaking change
> ---
>
> Key: SPARK-22824
> URL: https://issues.apache.org/jira/browse/SPARK-22824
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Azure Databricks 3.4
>Reporter: Matthieu Maitre
>
> org.apache.spark.sql.execution.streaming.Offset was moved to 
> org.apache.spark.sql.sources.v2.reader.Offset on the Source trait:
> https://github.com/apache/spark/commit/f8c7c1f21aa9d1fd38b584ca8c4adf397966e9f7#diff-56453fdb4dc2d7666c7ab237cb102189
> This broke our custom sources that are called in Spark jobs running on Azure 
> Databricks 3.4 since the Maven package does not match the hosted Spark bits. 
> We use the following Maven version to be able to deploy on Azure Databricks:
> 
> org.apache.spark
> spark-sql_2.11
> 2.3.0-SNAPSHOT
> 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22824) Spark Structured Streaming Source trait breaking change

2017-12-18 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reassigned SPARK-22824:


Assignee: Jose Torres

> Spark Structured Streaming Source trait breaking change
> ---
>
> Key: SPARK-22824
> URL: https://issues.apache.org/jira/browse/SPARK-22824
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Azure Databricks 3.4
>Reporter: Matthieu Maitre
>Assignee: Jose Torres
>
> org.apache.spark.sql.execution.streaming.Offset was moved to 
> org.apache.spark.sql.sources.v2.reader.Offset on the Source trait:
> https://github.com/apache/spark/commit/f8c7c1f21aa9d1fd38b584ca8c4adf397966e9f7#diff-56453fdb4dc2d7666c7ab237cb102189
> This broke our custom sources that are called in Spark jobs running on Azure 
> Databricks 3.4 since the Maven package does not match the hosted Spark bits. 
> We use the following Maven version to be able to deploy on Azure Databricks:
> 
> org.apache.spark
> spark-sql_2.11
> 2.3.0-SNAPSHOT
> 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2017-12-12 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288391#comment-16288391
 ] 

Michael Armbrust commented on SPARK-20928:
--

An update on this.  We've started to create subtasks break down the process of 
adding this new execution mode and we are targeting an alpha version in 2.3.  
The basics of the new API for Sources and Sinks (for both microbatch and 
continuous mode) has been up for a few days if people want to see more details. 
 We'll follow with PRs to add a continuous execution engine and an 
implementation of a continuous kafka connector.

Regarding some of the questions:
 - The version we are targeting for 2.3 will only support map operations and 
thus will not support shuffles / aggregating by windows (although the window() 
operator is just a projection so will work for window assignment).
 - I think the API is designed in such as way that we can build a streaming 
shuffle that aligns on epochIds in the future, allowing us to easily extend the 
continuous engine to handle stateful operations as well.

> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-08-30 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148227#comment-16148227
 ] 

Michael Armbrust edited comment on SPARK-20928 at 8/30/17 11:52 PM:


Hey everyone, thanks for your interest in this feature!  I'm still targeting 
Spark 2.3, but unfortunately have been busy with other things since the summit. 
Will post more details on the design as soon as we have them! The Spark summit 
demo just showed a hacked-together prototype, but we need to do more to figure 
out how to best integrate it into Spark.


was (Author: marmbrus):
Hey everyone, thanks for your interest in this feature!  I'm still targeting 
Spark 2.3, but unfortunately have been busy with other things since the summit. 
Will post more details on the design as soon as we have them!

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-08-30 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148227#comment-16148227
 ] 

Michael Armbrust commented on SPARK-20928:
--

Hey everyone, thanks for your interest in this feature!  I'm still targeting 
Spark 2.3, but unfortunately have been busy with other things since the summit. 
Will post more details on the design as soon as we have them!

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20441) Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation

2017-07-07 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20441:
-
Affects Version/s: (was: 2.2.0)

> Within the same streaming query, one StreamingRelation should only be 
> transformed to one StreamingExecutionRelation
> ---
>
> Key: SPARK-20441
> URL: https://issues.apache.org/jira/browse/SPARK-20441
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2
>Reporter: Liwei Lin
> Fix For: 2.2.0
>
>
> Within the same streaming query, when one StreamingRelation is referred 
> multiple times -- e.g. df.union(df) -- we should transform it only to one 
> StreamingExecutionRelation, instead of two or more different  
> StreamingExecutionRelations.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20441) Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation

2017-07-07 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20441:
-
Fix Version/s: 2.2.0

> Within the same streaming query, one StreamingRelation should only be 
> transformed to one StreamingExecutionRelation
> ---
>
> Key: SPARK-20441
> URL: https://issues.apache.org/jira/browse/SPARK-20441
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2
>Reporter: Liwei Lin
> Fix For: 2.2.0
>
>
> Within the same streaming query, when one StreamingRelation is referred 
> multiple times -- e.g. df.union(df) -- we should transform it only to one 
> StreamingExecutionRelation, instead of two or more different  
> StreamingExecutionRelations.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21267) Improvements to the Structured Streaming programming guide

2017-07-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-21267:
-
Target Version/s:   (was: 2.2.0)

> Improvements to the Structured Streaming programming guide
> --
>
> Key: SPARK-21267
> URL: https://issues.apache.org/jira/browse/SPARK-21267
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Structured Streaming
>Affects Versions: 2.1.1
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>
> - Add information for Ganglia
> - Add Kafka Sink to the main docs
> - Move Structured Streaming above Spark Streaming



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-06-30 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070525#comment-16070525
 ] 

Michael Armbrust commented on SPARK-18057:
--

We should upgrade.  Now that Kafka has a good protocol versioning story, I also 
wonder if we should get rid of the version in our artifacts entirely.  When we 
upgrade it would also be good if we can add the new headers to the row that we 
output.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15533) Deprecate Dataset.explode

2017-06-30 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070308#comment-16070308
 ] 

Michael Armbrust commented on SPARK-15533:
--

Just include the other columns too {{df.select($"id", explode($"val"))}}

> Deprecate Dataset.explode
> -
>
> Key: SPARK-15533
> URL: https://issues.apache.org/jira/browse/SPARK-15533
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Sameer Agarwal
> Fix For: 2.0.0
>
>
> See discussion on the mailing list: 
> http://mail-archives.apache.org/mod_mbox/spark-user/201605.mbox/browser
> We should deprecate Dataset.explode, and point users to Dataset.flatMap and 
> functions.explode with select.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21253) Cannot fetch big blocks to disk

2017-06-29 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-21253:
-
Target Version/s: 2.2.0

> Cannot fetch big blocks to disk 
> 
>
> Key: SPARK-21253
> URL: https://issues.apache.org/jira/browse/SPARK-21253
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yuming Wang
>Assignee: Shixiong Zhu
> Attachments: ui-thread-dump-jqhadoop221-154.gif
>
>
> Spark *cluster* can reproduce, *local* can't:
> 1. Start a spark context with {{spark.reducer.maxReqSizeShuffleToMem=1K}}:
> {code:actionscript}
> $ spark-shell --conf spark.reducer.maxReqSizeShuffleToMem=1K
> {code}
> 2. A shuffle:
> {code:actionscript}
> scala> sc.parallelize(0 until 300, 10).repartition(2001).count()
> {code}
> The error messages:
> {noformat}
> org.apache.spark.shuffle.FetchFailedException: Failed to send request for 
> 1649611690367_2 to yhd-jqhadoop166.int.yihaodian.com/10.17.28.166:7337: 
> java.io.IOException: Connection reset by peer
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:418)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
> at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
> at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to send request for 1649611690367_2 to 
> yhd-jqhadoop166.int.yihaodian.com/10.17.28.166:7337: java.io.IOException: 
> Connection reset by peer
> at 
> org.apache.spark.network.client.TransportClient.lambda$stream$1(TransportClient.java:196)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
> at 
> io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:163)
> at 
> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93)
> at 
> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28)
> at 
> org.apache.spark.network.client.TransportClient.stream(TransportClient.java:183)
> at 
> 

[jira] [Assigned] (SPARK-21253) Cannot fetch big blocks to disk

2017-06-29 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reassigned SPARK-21253:


Assignee: Shixiong Zhu

> Cannot fetch big blocks to disk 
> 
>
> Key: SPARK-21253
> URL: https://issues.apache.org/jira/browse/SPARK-21253
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yuming Wang
>Assignee: Shixiong Zhu
> Attachments: ui-thread-dump-jqhadoop221-154.gif
>
>
> Spark *cluster* can reproduce, *local* can't:
> 1. Start a spark context with {{spark.reducer.maxReqSizeShuffleToMem=1K}}:
> {code:actionscript}
> $ spark-shell --conf spark.reducer.maxReqSizeShuffleToMem=1K
> {code}
> 2. A shuffle:
> {code:actionscript}
> scala> sc.parallelize(0 until 300, 10).repartition(2001).count()
> {code}
> The error messages:
> {noformat}
> org.apache.spark.shuffle.FetchFailedException: Failed to send request for 
> 1649611690367_2 to yhd-jqhadoop166.int.yihaodian.com/10.17.28.166:7337: 
> java.io.IOException: Connection reset by peer
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:418)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
> at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
> at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to send request for 1649611690367_2 to 
> yhd-jqhadoop166.int.yihaodian.com/10.17.28.166:7337: java.io.IOException: 
> Connection reset by peer
> at 
> org.apache.spark.network.client.TransportClient.lambda$stream$1(TransportClient.java:196)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
> at 
> io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:163)
> at 
> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93)
> at 
> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28)
> at 
> org.apache.spark.network.client.TransportClient.stream(TransportClient.java:183)
> at 
> 

[jira] [Commented] (SPARK-21110) Structs should be usable in inequality filters

2017-06-23 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061398#comment-16061398
 ] 

Michael Armbrust commented on SPARK-21110:
--

It seems if you can call {{min}} and {{max}} on structs you should be able to 
use comparison operations as well.

> Structs should be usable in inequality filters
> --
>
> Key: SPARK-21110
> URL: https://issues.apache.org/jira/browse/SPARK-21110
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Nicholas Chammas
>Priority: Minor
>
> It seems like a missing feature that you can't compare structs in a filter on 
> a DataFrame.
> Here's a simple demonstration of a) where this would be useful and b) how 
> it's different from simply comparing each of the components of the structs.
> {code}
> import pyspark
> from pyspark.sql.functions import col, struct, concat
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame(
> [
> ('Boston', 'Bob'),
> ('Boston', 'Nick'),
> ('San Francisco', 'Bob'),
> ('San Francisco', 'Nick'),
> ],
> ['city', 'person']
> )
> pairs = (
> df.select(
> struct('city', 'person').alias('p1')
> )
> .crossJoin(
> df.select(
> struct('city', 'person').alias('p2')
> )
> )
> )
> print("Everything")
> pairs.show()
> print("Comparing parts separately (doesn't give me what I want)")
> (pairs
> .where(col('p1.city') < col('p2.city'))
> .where(col('p1.person') < col('p2.person'))
> .show())
> print("Comparing parts together with concat (gives me what I want but is 
> hacky)")
> (pairs
> .where(concat('p1.city', 'p1.person') < concat('p2.city', 'p2.person'))
> .show())
> print("Comparing parts together with struct (my desired solution but 
> currently yields an error)")
> (pairs
> .where(col('p1') < col('p2'))
> .show())
> {code}
> The last query yields the following error in Spark 2.1.1:
> {code}
> org.apache.spark.sql.AnalysisException: cannot resolve '(`p1` < `p2`)' due to 
> data type mismatch: '(`p1` < `p2`)' requires (boolean or tinyint or smallint 
> or int or bigint or float or double or decimal or timestamp or date or string 
> or binary) type, not struct;;
> 'Filter (p1#5 < p2#8)
> +- Join Cross
>:- Project [named_struct(city, city#0, person, person#1) AS p1#5]
>:  +- LogicalRDD [city#0, person#1]
>+- Project [named_struct(city, city#0, person, person#1) AS p2#8]
>   +- LogicalRDD [city#0, person#1]
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21110) Structs should be usable in inequality filters

2017-06-23 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-21110:
-
Target Version/s: 2.3.0

> Structs should be usable in inequality filters
> --
>
> Key: SPARK-21110
> URL: https://issues.apache.org/jira/browse/SPARK-21110
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Nicholas Chammas
>Priority: Minor
>
> It seems like a missing feature that you can't compare structs in a filter on 
> a DataFrame.
> Here's a simple demonstration of a) where this would be useful and b) how 
> it's different from simply comparing each of the components of the structs.
> {code}
> import pyspark
> from pyspark.sql.functions import col, struct, concat
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame(
> [
> ('Boston', 'Bob'),
> ('Boston', 'Nick'),
> ('San Francisco', 'Bob'),
> ('San Francisco', 'Nick'),
> ],
> ['city', 'person']
> )
> pairs = (
> df.select(
> struct('city', 'person').alias('p1')
> )
> .crossJoin(
> df.select(
> struct('city', 'person').alias('p2')
> )
> )
> )
> print("Everything")
> pairs.show()
> print("Comparing parts separately (doesn't give me what I want)")
> (pairs
> .where(col('p1.city') < col('p2.city'))
> .where(col('p1.person') < col('p2.person'))
> .show())
> print("Comparing parts together with concat (gives me what I want but is 
> hacky)")
> (pairs
> .where(concat('p1.city', 'p1.person') < concat('p2.city', 'p2.person'))
> .show())
> print("Comparing parts together with struct (my desired solution but 
> currently yields an error)")
> (pairs
> .where(col('p1') < col('p2'))
> .show())
> {code}
> The last query yields the following error in Spark 2.1.1:
> {code}
> org.apache.spark.sql.AnalysisException: cannot resolve '(`p1` < `p2`)' due to 
> data type mismatch: '(`p1` < `p2`)' requires (boolean or tinyint or smallint 
> or int or bigint or float or double or decimal or timestamp or date or string 
> or binary) type, not struct;;
> 'Filter (p1#5 < p2#8)
> +- Join Cross
>:- Project [named_struct(city, city#0, person, person#1) AS p1#5]
>:  +- LogicalRDD [city#0, person#1]
>+- Project [named_struct(city, city#0, person, person#1) AS p2#8]
>   +- LogicalRDD [city#0, person#1]
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21133) HighlyCompressedMapStatus#writeExternal throws NPE

2017-06-19 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-21133:
-
Target Version/s: 2.2.0
Priority: Blocker  (was: Major)
 Description: 
Reproduce, set {{set spark.sql.shuffle.partitions>2000}} with shuffle, for 
simple:

{code:sql}
spark-sql --executor-memory 12g --driver-memory 8g --executor-cores 7   -e "
  set spark.sql.shuffle.partitions=2001;
  drop table if exists spark_hcms_npe;
  create table spark_hcms_npe as select id, count(*) from big_table group by id;
"
{code}

Error logs:
{noformat}
17/06/18 15:00:27 ERROR Utils: Exception encountered
java.lang.NullPointerException
at 
org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
at 
org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
at 
org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
at 
org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
at 
org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
at 
org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at 
org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
at 
org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
at 
org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/06/18 15:00:27 ERROR MapOutputTrackerMaster: java.lang.NullPointerException
java.io.IOException: java.lang.NullPointerException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310)
at 
org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
at 
org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
at 
org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at 
org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
at 
org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
at 
org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
at 
org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
at 
org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
... 17 more
17/06/18 

[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-06-19 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054596#comment-16054596
 ] 

Michael Armbrust commented on SPARK-20928:
--

Hi Cody, I do plan to flesh this out with the other sections of the SIP 
document and will email the dev list at that point.  All that has been done so 
far is some basic prototyping to estimate how much work an alternative 
{{StreamExecution}} would take to build, and some experiments to validate the 
latencies that this arch could achieve.  Do you have specific concerns with the 
proposal as it stands?

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20980) Rename the option `wholeFile` to `multiLine` for JSON and CSV

2017-06-05 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16037416#comment-16037416
 ] 

Michael Armbrust commented on SPARK-20980:
--

I already cut RC4, I think we may just need to accept both options moving 
forward.

> Rename the option `wholeFile` to `multiLine` for JSON and CSV
> -
>
> Key: SPARK-20980
> URL: https://issues.apache.org/jira/browse/SPARK-20980
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Blocker
>
> The current option name `wholeFile` is misleading for CSV. Currently, it is 
> not representing a record per file. Actually, one file could have multiple 
> records. Thus, we should rename it. Now, the proposal is `multiLine`.
> To make it consistent, we need to rename the same option for JSON and fix the 
> issue in another JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-20737) Mechanism for cleanup hooks, for structured-streaming sinks on executor shutdown.

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust closed SPARK-20737.

Resolution: Won't Fix

> Mechanism for cleanup hooks, for structured-streaming sinks on executor 
> shutdown.
> -
>
> Key: SPARK-20737
> URL: https://issues.apache.org/jira/browse/SPARK-20737
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Prashant Sharma
>  Labels: Kafka
>
> Add a standard way of cleanup during shutdown of executors for structured 
> streaming sinks in general and KafkaSink in particular.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20065) Empty output files created for aggregation query in append mode

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20065:
-
Target Version/s: 2.3.0

> Empty output files created for aggregation query in append mode
> ---
>
> Key: SPARK-20065
> URL: https://issues.apache.org/jira/browse/SPARK-20065
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Silvio Fiorito
>
> I've got a Kafka topic which I'm querying, running a windowed aggregation, 
> with a 30 second watermark, 10 second trigger, writing out to Parquet with 
> append output mode.
> Every 10 second trigger generates a file, regardless of whether there was any 
> data for that trigger, or whether any records were actually finalized by the 
> watermark.
> Is this expected behavior or should it not write out these empty files?
> {code}
> val df = spark.readStream.format("kafka")
> val query = df
>   .withWatermark("timestamp", "30 seconds")
>   .groupBy(window($"timestamp", "10 seconds"))
>   .count()
>   .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count")
> query
>   .writeStream
>   .format("parquet")
>   .option("checkpointLocation", aggChk)
>   .trigger(ProcessingTime("10 seconds"))
>   .outputMode("append")
>   .start(aggPath)
> {code}
> As the query executes, do a file listing on "aggPath" and you'll see 339 byte 
> files at a minimum until we arrive at the first watermark and the initial 
> batch is finalized. Even after that though, as there are empty batches it'll 
> keep generating empty files every trigger.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19903) Watermark metadata is lost when using resolved attributes

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19903:
-
Target Version/s: 2.3.0

> Watermark metadata is lost when using resolved attributes
> -
>
> Key: SPARK-19903
> URL: https://issues.apache.org/jira/browse/SPARK-19903
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: Ubuntu Linux
>Reporter: Piotr Nestorow
>
> PySpark example reads a Kafka stream. There is watermarking set when handling 
> the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on 
> streaming DataFrames/DataSets'
> The Python example:
> ---
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
> if len(sys.argv) != 4:
> print("""
> Usage: structured_kafka_wordcount.py  
>  
> """, file=sys.stderr)
> exit(-1)
> bootstrapServers = sys.argv[1]
> subscribeType = sys.argv[2]
> topics = sys.argv[3]
> spark = SparkSession\
> .builder\
> .appName("StructuredKafkaWordCount")\
> .getOrCreate()
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", bootstrapServers)\
> .option(subscribeType, topics)\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array 
> into multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "words")\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into 
> multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> --
> Note that the Scala version of the same example in Zeppelin notebook works 
> fine:
> 
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "words")
> .load()
> // Split the lines into words, retaining timestamps
> val words = lines
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS 
> TIMESTAMP)")
> .as[(String, Timestamp)]
> .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
> .toDF("word", "timestamp")
> // Group the data by window and word and compute the 

[jira] [Updated] (SPARK-19903) Watermark metadata is lost when using resolved attributes

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19903:
-
Component/s: (was: PySpark)

> Watermark metadata is lost when using resolved attributes
> -
>
> Key: SPARK-19903
> URL: https://issues.apache.org/jira/browse/SPARK-19903
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: Ubuntu Linux
>Reporter: Piotr Nestorow
>
> PySpark example reads a Kafka stream. There is watermarking set when handling 
> the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on 
> streaming DataFrames/DataSets'
> The Python example:
> ---
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
> if len(sys.argv) != 4:
> print("""
> Usage: structured_kafka_wordcount.py  
>  
> """, file=sys.stderr)
> exit(-1)
> bootstrapServers = sys.argv[1]
> subscribeType = sys.argv[2]
> topics = sys.argv[3]
> spark = SparkSession\
> .builder\
> .appName("StructuredKafkaWordCount")\
> .getOrCreate()
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", bootstrapServers)\
> .option(subscribeType, topics)\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array 
> into multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "words")\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into 
> multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> --
> Note that the Scala version of the same example in Zeppelin notebook works 
> fine:
> 
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "words")
> .load()
> // Split the lines into words, retaining timestamps
> val words = lines
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS 
> TIMESTAMP)")
> .as[(String, Timestamp)]
> .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
> .toDF("word", "timestamp")
> // Group the data by window and word and 

[jira] [Updated] (SPARK-19903) Watermark metadata is lost when using resolved attributes

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19903:
-
Summary: Watermark metadata is lost when using resolved attributes  (was: 
PySpark Kafka streaming query ouput append mode not possible)

> Watermark metadata is lost when using resolved attributes
> -
>
> Key: SPARK-19903
> URL: https://issues.apache.org/jira/browse/SPARK-19903
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: Ubuntu Linux
>Reporter: Piotr Nestorow
>
> PySpark example reads a Kafka stream. There is watermarking set when handling 
> the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on 
> streaming DataFrames/DataSets'
> The Python example:
> ---
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
> if len(sys.argv) != 4:
> print("""
> Usage: structured_kafka_wordcount.py  
>  
> """, file=sys.stderr)
> exit(-1)
> bootstrapServers = sys.argv[1]
> subscribeType = sys.argv[2]
> topics = sys.argv[3]
> spark = SparkSession\
> .builder\
> .appName("StructuredKafkaWordCount")\
> .getOrCreate()
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", bootstrapServers)\
> .option(subscribeType, topics)\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array 
> into multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "words")\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into 
> multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> --
> Note that the Scala version of the same example in Zeppelin notebook works 
> fine:
> 
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "words")
> .load()
> // Split the lines into words, retaining timestamps
> val words = lines
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS 
> TIMESTAMP)")
> .as[(String, Timestamp)]
> .flatMap(line => line._1.split(" ").map(word => 

[jira] [Updated] (SPARK-19903) PySpark Kafka streaming query ouput append mode not possible

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19903:
-
Description: 
PySpark example reads a Kafka stream. There is watermarking set when handling 
the data window. The defined query uses output Append mode.

The PySpark engine reports the error:
'Append output mode not supported when there are streaming aggregations on 
streaming DataFrames/DataSets'

The Python example:
---
{code}
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window

if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py  
 
""", file=sys.stderr)
exit(-1)

bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]

spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()

# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array 
into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()

# Start running the query that prints the running counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option("truncate", "false")\
.start()

query.awaitTermination()
{code}

The corresponding example in Zeppelin notebook:
{code}
%spark.pyspark

from pyspark.sql.functions import explode, split, window

# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "words")\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into 
multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()

# Start running the query that prints the running counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option("truncate", "false")\
.start()

query.awaitTermination()
--

Note that the Scala version of the same example in Zeppelin notebook works fine:

import java.sql.Timestamp
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.functions._

// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "words")
.load()

// Split the lines into words, retaining timestamps
val words = lines
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
.as[(String, Timestamp)]
.flatMap(line => line._1.split(" ").map(word => (word, line._2)))
.toDF("word", "timestamp")

// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "30 seconds")
.groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word")
.count()

// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
.outputMode("append")
.format("console")
.trigger(ProcessingTime("35 seconds"))
.option("truncate", "false")
.start()

query.awaitTermination()
{code}


  was:
PySpark example reads a Kafka stream. There is watermarking set when handling 
the data window. 

[jira] [Commented] (SPARK-20002) Add support for unions between streaming and batch datasets

2017-06-02 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035441#comment-16035441
 ] 

Michael Armbrust commented on SPARK-20002:
--

I'm not sure that we will ever support this.  The issue is that for batch 
datasets, we don't track what has been read.  Thus its unclear what should 
happen when the query is restarted.  Instead, I think you can always achieve 
the same result by just loading both datasets as a stream (even if you don't 
plan to change one of them).  Would that work?

> Add support for unions between streaming and batch datasets
> ---
>
> Key: SPARK-20002
> URL: https://issues.apache.org/jira/browse/SPARK-20002
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 2.0.2
>Reporter: Leon Pham
>
> Currently unions between streaming datasets and batch datasets are not 
> supported.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20147) Cloning SessionState does not clone streaming query listeners

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-20147.
--
  Resolution: Fixed
Assignee: Kunal Khamar
   Fix Version/s: 2.2.0
Target Version/s: 2.2.0

Fixed by https://github.com/apache/spark/pull/17379

> Cloning SessionState does not clone streaming query listeners
> -
>
> Key: SPARK-20147
> URL: https://issues.apache.org/jira/browse/SPARK-20147
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Kunal Khamar
>Assignee: Kunal Khamar
> Fix For: 2.2.0
>
>
> Cloning session should clone StreamingQueryListeners registered on the 
> StreamingQueryListenerBus.
> Similar to SPARK-20048, https://github.com/apache/spark/pull/17379



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20928:
-
Description: 
Given the current Source API, the minimum possible latency for any record is 
bounded by the amount of time that it takes to launch a task.  This limitation 
is a result of the fact that {{getBatch}} requires us to know both the starting 
and the ending offset, before any tasks are launched.  In the worst case, the 
end-to-end latency is actually closer to the average batch time + task 
launching time.

For applications where latency is more important than exactly-once output 
however, it would be useful if processing could happen continuously.  This 
would allow us to achieve fully pipelined reading and writing from sources such 
as Kafka.  This kind of architecture would make it possible to process records 
with end-to-end latencies on the order of 1 ms, rather than the 10-100ms that 
is possible today.

One possible architecture here would be to change the Source API to look like 
the following rough sketch:

{code}
  trait Epoch {
def data: DataFrame

/** The exclusive starting position for `data`. */
def startOffset: Offset

/** The inclusive ending position for `data`.  Incrementally updated during 
processing, but not complete until execution of the query plan in `data` is 
finished. */
def endOffset: Offset
  }

  def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], limits: 
Limits): Epoch
{code}

The above would allow us to build an alternative implementation of 
{{StreamExecution}} that processes continuously with much lower latency and 
only stops processing when needing to reconfigure the stream (either due to a 
failure or a user requested change in parallelism.

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20734) Structured Streaming spark.sql.streaming.schemaInference not handling schema changes

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20734:
-
Issue Type: New Feature  (was: Bug)

> Structured Streaming spark.sql.streaming.schemaInference not handling schema 
> changes
> 
>
> Key: SPARK-20734
> URL: https://issues.apache.org/jira/browse/SPARK-20734
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.1
>Reporter: Ram
>
> sparkSession.config("spark.sql.streaming.schemaInference", 
> true).getOrCreate();
> Dataset dataset = 
> sparkSession.readStream().parquet("file:/files-to-process");
> StreamingQuery streamingQuery =
> dataset.writeStream().option("checkpointLocation", 
> "file:/checkpoint-location")
> .outputMode(Append()).start("file:/save-parquet-files");
> streamingQuery.awaitTermination();
> After streaming query started If there's a schema changes on new paruet 
> files under files-to-process directory. Structured Streaming not writing new 
> schema changes. Is it possible to handle these schema changes in Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20958) Roll back parquet-mr 1.8.2 to parquet-1.8.1

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20958:
-
Labels: release-notes  (was: )

> Roll back parquet-mr 1.8.2 to parquet-1.8.1
> ---
>
> Key: SPARK-20958
> URL: https://issues.apache.org/jira/browse/SPARK-20958
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
>  Labels: release-notes
>
> We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on 
> avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 
> and avro 1.7.7 used by spark-core 2.2.0-rc2.
> Basically, Spark 2.2.0-rc2 introduced two incompatible versions of avro 
> (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the 
> reasons mentioned in [PR 
> #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. 
> Therefore, we don't really have many choices here and have to roll back 
> parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20958) Roll back parquet-mr 1.8.2 to parquet-1.8.1

2017-06-02 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-20958.
--
Resolution: Won't Fix

Thanks everyone.  Sounds like we'll just provide directions in the release 
notes for users of parquet-avro to pin the version 1.8.1.

> Roll back parquet-mr 1.8.2 to parquet-1.8.1
> ---
>
> Key: SPARK-20958
> URL: https://issues.apache.org/jira/browse/SPARK-20958
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
>  Labels: release-notes
>
> We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on 
> avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 
> and avro 1.7.7 used by spark-core 2.2.0-rc2.
> Basically, Spark 2.2.0-rc2 introduced two incompatible versions of avro 
> (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the 
> reasons mentioned in [PR 
> #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. 
> Therefore, we don't really have many choices here and have to roll back 
> parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19104) CompileException with Map and Case Class in Spark 2.1.0

2017-06-02 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035012#comment-16035012
 ] 

Michael Armbrust commented on SPARK-19104:
--

I'm about to cut RC3 of 2.2 and there is no pull request to fix this.  
Unfortunately that means it's not going to be fixed in 2.2.0

>  CompileException with Map and Case Class in Spark 2.1.0
> 
>
> Key: SPARK-19104
> URL: https://issues.apache.org/jira/browse/SPARK-19104
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Nils Grabbert
>
> The following code will run with Spark 2.0.2 but not with Spark 2.1.0:
> {code}
> case class InnerData(name: String, value: Int)
> case class Data(id: Int, param: Map[String, InnerData])
> val data = Seq.tabulate(10)(i => Data(1, Map("key" -> InnerData("name", i + 
> 100
> val ds   = spark.createDataset(data)
> {code}
> Exception:
> {code}
> Caused by: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 63, Column 46: Expression 
> "ExternalMapToCatalyst_value_isNull1" is not an rvalue 
>   at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11004) 
>   at 
> org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:6639)
>  
>   at 
> org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:5001) 
>   at org.codehaus.janino.UnitCompiler.access$10500(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAmbiguousName(UnitCompiler.java:4984)
>  
>   at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3633) 
>   at org.codehaus.janino.Java$Lvalue.accept(Java.java:3563) 
>   at 
> org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:4956) 
>   at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4925) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3189) 
>   at org.codehaus.janino.UnitCompiler.access$5100(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3143) 
>   at 
> org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3139) 
>   at org.codehaus.janino.Java$Assignment.accept(Java.java:3847) 
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112) 
>   at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
>  
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
>  
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558) 
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370) 
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450) 
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811) 
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1262)
>  
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1234)
>  
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:538) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) 
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>  
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>  
>   at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128) 
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369) 
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209)
>  
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:420) 
>   at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:374)
>  
>   at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:369)
>  
>   at 
> org.codehaus.janino.Java$AbstractPackageMemberClassDeclaration.accept(Java.java:1309)
>  
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369) 
>   at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:345) 
>   at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:396)
>  
>   at 
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:311)
>  
>   at 

[jira] [Updated] (SPARK-15693) Write schema definition out for file-based data sources to avoid schema inference

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15693:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Write schema definition out for file-based data sources to avoid schema 
> inference
> -
>
> Key: SPARK-15693
> URL: https://issues.apache.org/jira/browse/SPARK-15693
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>
> Spark supports reading a variety of data format, many of which don't have 
> self-describing schema. For these file formats, Spark often can infer the 
> schema by going through all the data. However, schema inference is expensive 
> and does not always infer the intended schema (for example, with json data 
> Spark always infer integer types as long, rather than int).
> It would be great if Spark can write the schema definition out for file-based 
> formats, and when reading the data in, schema can be "inferred" directly by 
> reading the schema definition file without going through full schema 
> inference. If the file does not exist, then the good old schema inference 
> should be performed.
> This ticket certainly merits a design doc that should discuss the spec for 
> schema definition, as well as all the corner cases that this feature needs to 
> handle (e.g. schema merging, schema evolution, partitioning). It would be 
> great if the schema definition is using a human readable format (e.g. JSON).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15380) Generate code that stores a float/double value in each column from ColumnarBatch when DataFrame.cache() is used

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15380:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Generate code that stores a float/double value in each column from 
> ColumnarBatch when DataFrame.cache() is used
> ---
>
> Key: SPARK-15380
> URL: https://issues.apache.org/jira/browse/SPARK-15380
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Kazuaki Ishizaki
>
> When DataFrame.cache() is called, data will be stored as column-oriented 
> storage in CachedBatch. The current Catalyst generates Java program to store 
> a computed value to an InternalRow and then the value is stored into 
> CachedBatch even if data is read from ColumnarBatch for ParquetReader. 
> This JIRA generates Java code to store a value into a ColumnarBatch, and 
> store data from the ColumnarBatch to the CachedBatch. This JIRA handles only 
> float and double types for a value.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19084) conditional function: field

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19084:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> conditional function: field
> ---
>
> Key: SPARK-19084
> URL: https://issues.apache.org/jira/browse/SPARK-19084
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Chenzhao Guo
>
> field(str, str1, str2, ... ) is a variable-length(>=2) function which returns 
> the index of str in the list (str1, str2, ... ) or 0 if not found.
> Every parameter is required to be subtype of AtomicType.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15691) Refactor and improve Hive support

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15691:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Refactor and improve Hive support
> -
>
> Key: SPARK-15691
> URL: https://issues.apache.org/jira/browse/SPARK-15691
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>
> Hive support is important to Spark SQL, as many Spark users use it to read 
> from Hive. The current architecture is very difficult to maintain, and this 
> ticket tracks progress towards getting us to a sane state.
> A number of things we want to accomplish are:
> - Move the Hive specific catalog logic into HiveExternalCatalog.
>   -- Remove HiveSessionCatalog. All Hive-related stuff should go into 
> HiveExternalCatalog. This would require moving caching either into 
> HiveExternalCatalog, or just into SessionCatalog.
>   -- Move using properties to store data source options into 
> HiveExternalCatalog (So, for a CatalogTable returned by HiveExternalCatalog, 
> we do not need to distinguish tables stored in hive formats and data source 
> tables).
>   -- Potentially more.
> - Remove HIve's specific ScriptTransform implementation and make it more 
> general so we can put it in sql/core.
> - Implement HiveTableScan (and write path) as a data source, so we don't need 
> a special planner rule for HiveTableScan.
> - Remove HiveSharedState and HiveSessionState.
> One thing that is still unclear to me is how to work with Hive UDF support. 
> We might still need a special planner rule there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-14878) Support Trim characters in the string trim function

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-14878:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Support Trim characters in the string trim function
> ---
>
> Key: SPARK-14878
> URL: https://issues.apache.org/jira/browse/SPARK-14878
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: kevin yu
>
> The current Spark SQL does not support the trim characters in the string trim 
> function, which is part of ANSI SQL2003’s standard. For example, IBM DB2 
> fully supports it as shown in the 
> https://www.ibm.com/support/knowledgecenter/SS6NHC/com.ibm.swg.im.dashdb.sql.ref.doc/doc/r0023198.html.
>  We propose to implement it in this JIRA..
> The ANSI SQL2003's trim Syntax:
> {noformat}
> SQL
>  ::= TRIM   
>  ::= [ [  ] [  ] FROM ] 
> 
>  ::= 
>  ::=
>   LEADING
> | TRAILING
> | BOTH
>  ::= 
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16496) Add wholetext as option for reading text in SQL.

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-16496:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Add wholetext as option for reading text in SQL.
> 
>
> Key: SPARK-16496
> URL: https://issues.apache.org/jira/browse/SPARK-16496
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Prashant Sharma
>
> In multiple text analysis problems, it is not often desirable for the rows to 
> be split by "\n". There exists a wholeText reader for RDD API, and this JIRA 
> just adds the same support for Dataset API.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19241) remove hive generated table properties if they are not useful in Spark

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19241:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> remove hive generated table properties if they are not useful in Spark
> --
>
> Key: SPARK-19241
> URL: https://issues.apache.org/jira/browse/SPARK-19241
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Wenchen Fan
>
> When we save a table into hive metastore, hive will generate some table 
> properties automatically. e.g. transient_lastDdlTime, last_modified_by, 
> rawDataSize, etc. Some of them are useless in Spark SQL, we should remove 
> them.
> It will be good if we can get the list of Hive-generated table properties via 
> Hive API, so that we don't need to hardcode them.
> We can take a look at Hive code to see how it excludes these auto-generated 
> table properties when describe table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16317) Add file filtering interface for FileFormat

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-16317:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Add file filtering interface for FileFormat
> ---
>
> Key: SPARK-16317
> URL: https://issues.apache.org/jira/browse/SPARK-16317
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Cheng Lian
>Priority: Minor
>
> {{FileFormat}} data sources like Parquet and Avro (provided by spark-avro) 
> have customized file filtering logics. For example, Parquet needs to filter 
> out summary files, while Avro provides a Hadoop configuration option to 
> filter out all files whose names don't end with ".avro".
> It would be nice to have a general file filtering interface in {{FileFormat}} 
> to handle similar requirements.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19027) estimate size of object buffer for object hash aggregate

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19027:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> estimate size of object buffer for object hash aggregate
> 
>
> Key: SPARK-19027
> URL: https://issues.apache.org/jira/browse/SPARK-19027
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19104) CompileException with Map and Case Class in Spark 2.1.0

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19104:
-
Target Version/s: 2.3.0  (was: 2.2.0)

>  CompileException with Map and Case Class in Spark 2.1.0
> 
>
> Key: SPARK-19104
> URL: https://issues.apache.org/jira/browse/SPARK-19104
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Nils Grabbert
>
> The following code will run with Spark 2.0.2 but not with Spark 2.1.0:
> {code}
> case class InnerData(name: String, value: Int)
> case class Data(id: Int, param: Map[String, InnerData])
> val data = Seq.tabulate(10)(i => Data(1, Map("key" -> InnerData("name", i + 
> 100
> val ds   = spark.createDataset(data)
> {code}
> Exception:
> {code}
> Caused by: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 63, Column 46: Expression 
> "ExternalMapToCatalyst_value_isNull1" is not an rvalue 
>   at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11004) 
>   at 
> org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:6639)
>  
>   at 
> org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:5001) 
>   at org.codehaus.janino.UnitCompiler.access$10500(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAmbiguousName(UnitCompiler.java:4984)
>  
>   at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3633) 
>   at org.codehaus.janino.Java$Lvalue.accept(Java.java:3563) 
>   at 
> org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:4956) 
>   at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4925) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3189) 
>   at org.codehaus.janino.UnitCompiler.access$5100(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3143) 
>   at 
> org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3139) 
>   at org.codehaus.janino.Java$Assignment.accept(Java.java:3847) 
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112) 
>   at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
>  
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
>  
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558) 
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370) 
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450) 
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811) 
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1262)
>  
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1234)
>  
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:538) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) 
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>  
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>  
>   at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128) 
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369) 
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209)
>  
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564) 
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:420) 
>   at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:206) 
>   at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:374)
>  
>   at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:369)
>  
>   at 
> org.codehaus.janino.Java$AbstractPackageMemberClassDeclaration.accept(Java.java:1309)
>  
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369) 
>   at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:345) 
>   at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:396)
>  
>   at 
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:311)
>  
>   at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:229) 
>   at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:196) 
>   at 

[jira] [Updated] (SPARK-18245) Improving support for bucketed table

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18245:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Improving support for bucketed table
> 
>
> Key: SPARK-18245
> URL: https://issues.apache.org/jira/browse/SPARK-18245
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>
> This is an umbrella ticket for improving various execution planning for 
> bucketed tables.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-14098) Generate Java code to build CachedColumnarBatch and get values from CachedColumnarBatch when DataFrame.cache() is called

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-14098:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Generate Java code to build CachedColumnarBatch and get values from 
> CachedColumnarBatch when DataFrame.cache() is called
> 
>
> Key: SPARK-14098
> URL: https://issues.apache.org/jira/browse/SPARK-14098
> Project: Spark
>  Issue Type: Umbrella
>  Components: SQL
>Reporter: Kazuaki Ishizaki
>
> [Here|https://docs.google.com/document/d/1-2BnW5ibuHIeQzmHEGIGkEcuMUCTk87pmPis2DKRg-Q/edit?usp=sharing]
>  is a design document for this change (***TODO: Update the document***).
> This JIRA implements a new in-memory cache feature used by DataFrame.cache 
> and Dataset.cache. The followings are basic design based on discussions with 
> Sameer, Weichen, Xiao, Herman, and Nong.
> * Use ColumnarBatch with ColumnVector that are common data representations 
> for columnar storage
> * Use multiple compression scheme (such as RLE, intdelta, and so on) for each 
> ColumnVector in ColumnarBatch depends on its data typpe
> * Generate code that is simple and specialized for each in-memory cache to 
> build an in-memory cache
> * Generate code that directly reads data from ColumnVector for the in-memory 
> cache by whole-stage codegen.
> * Enhance ColumnVector to keep UnsafeArrayData
> * Use primitive-type array for primitive uncompressed data type in 
> ColumnVector
> * Use byte[] for UnsafeArrayData and compressed data
> Based on this design, this JIRA generates two kinds of Java code for 
> DataFrame.cache()/Dataset.cache()
> * Generate Java code to build CachedColumnarBatch, which keeps data in 
> ColumnarBatch
> * Generate Java code to get a value of each column from ColumnarBatch
> ** a Get a value directly from from ColumnarBatch in code generated by whole 
> stage code gen (primary path)
> ** b Get a value thru an iterator if whole stage code gen is disabled (e.g. # 
> of columns is more than 100, as backup path)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19014) support complex aggregate buffer in HashAggregateExec

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19014:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> support complex aggregate buffer in HashAggregateExec
> -
>
> Key: SPARK-19014
> URL: https://issues.apache.org/jira/browse/SPARK-19014
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16011) SQL metrics include duplicated attempts

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-16011:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> SQL metrics include duplicated attempts
> ---
>
> Key: SPARK-16011
> URL: https://issues.apache.org/jira/browse/SPARK-16011
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core, SQL
>Reporter: Davies Liu
>Assignee: Wenchen Fan
>
> When I ran a simple scan and aggregate query, the number of rows in scan 
> could be different from run to run, but actually scanned result is correct, 
> the SQL metrics is wrong (should not include duplicated attempt), this is a 
> regression since 1.6.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18388) Running aggregation on many columns throws SOE

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18388:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Running aggregation on many columns throws SOE
> --
>
> Key: SPARK-18388
> URL: https://issues.apache.org/jira/browse/SPARK-18388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.2, 2.0.1
> Environment: PySpark 2.0.1, Jupyter
>Reporter: Raviteja Lokineni
> Attachments: spark-bug.csv, spark-bug-jupyter.py, 
> spark-bug-stacktrace.txt
>
>
> Usecase: I am generating weekly aggregates of every column of data
> {code}
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
> timeSeries = sqlContext.read.option("header", 
> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
> w = (Window()
>  .partitionBy("id")
>  .orderBy(col("dt").cast("timestamp").cast("long"))
>  .rangeBetween(-days(6), 0))
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
> df = timeSeries.select(cols)
> df.orderBy('id', 
> 'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19989) Flaky Test: org.apache.spark.sql.kafka010.KafkaSourceStressSuite

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19989:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Flaky Test: org.apache.spark.sql.kafka010.KafkaSourceStressSuite
> 
>
> Key: SPARK-19989
> URL: https://issues.apache.org/jira/browse/SPARK-19989
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming, Tests
>Affects Versions: 2.2.0
>Reporter: Kay Ousterhout
>Priority: Minor
>  Labels: flaky-test
>
> This test failed recently here: 
> https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74683/testReport/junit/org.apache.spark.sql.kafka010/KafkaSourceStressSuite/stress_test_with_multiple_topics_and_partitions/
> And based on Josh's dashboard 
> (https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaSourceStressSuite_name=stress+test+with+multiple+topics+and+partitions),
>  seems to fail a few times every month.  Here's the full error from the most 
> recent failure:
> Error Message
> {code}
> org.scalatest.exceptions.TestFailedException:  Error adding data: replication 
> factor: 1 larger than available brokers: 0 
> kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)  
> kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)  
> org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:173)
>   
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:903)
>   
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:901)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:93)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:92)
>   scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)  
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:92)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:494)
> {code}
> {code}
> sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
> Error adding data: replication factor: 1 larger than available brokers: 0
> kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
>   kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
>   
> org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:173)
>   
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:903)
>   
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:901)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:93)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:92)
>   scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:92)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:494)
> == Progress ==
>AssertOnQuery(, )
>CheckAnswer: 
>StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@5d888be0,Map())
>AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = Range(0, 1, 2, 3, 4, 5, 6, 7, 8), message = )
>CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9]
>StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@1be724ee,Map())
>AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = Range(9, 10, 11, 12, 13, 14), message = )
>CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15]
>StopStream
>AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = Range(), message = )
> => AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
> stress3), data = Range(15), message = Add topic stress7)
>AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
> stress3), data = Range(16, 17, 18, 19, 20, 21, 22), message = Add partition)
>AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
> stress3), data = Range(23, 24), message = Add partition)
>AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
> stress5, stress3), data = Range(), 

[jira] [Updated] (SPARK-17915) Prepare ColumnVector implementation for UnsafeData

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17915:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Prepare ColumnVector implementation for UnsafeData
> --
>
> Key: SPARK-17915
> URL: https://issues.apache.org/jira/browse/SPARK-17915
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Kazuaki Ishizaki
>
> Current implementations of {{ColumnarVector}} are {{OnHeapColumnarVector}} 
> and {{OffHeapColumnarVector}}, which are optimized for reading data from 
> Parquet. If they get an array, an map, or an struct from a {{Unsafe}} related 
> data structure, it is inefficient.
> This JIRA prepares a new implementation {{OnHeapUnsafeColumnarVector}} that 
> is optimized for reading data from a {{Unsafe}} related data structure.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18134) SQL: MapType in Group BY and Joins not working

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18134:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> SQL: MapType in Group BY and Joins not working
> --
>
> Key: SPARK-18134
> URL: https://issues.apache.org/jira/browse/SPARK-18134
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 2.0.0, 2.0.1, 
> 2.1.0
>Reporter: Christian Zorneck
>
> Since version 1.5 and issue SPARK-9415, MapTypes can no longer be used in 
> GROUP BY and join clauses. This makes it incompatible to HiveQL. So, a Hive 
> feature was removed from Spark. This makes Spark incompatible to various 
> HiveQL statements.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18455) General support for correlated subquery processing

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18455:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> General support for correlated subquery processing
> --
>
> Key: SPARK-18455
> URL: https://issues.apache.org/jira/browse/SPARK-18455
> Project: Spark
>  Issue Type: Story
>  Components: SQL
>Reporter: Nattavut Sutyanyong
> Attachments: SPARK-18455-scoping-doc.pdf
>
>
> Subquery support has been introduced in Spark 2.0. The initial implementation 
> covers the most common subquery use case: the ones used in TPC queries for 
> instance.
> Spark currently supports the following subqueries:
> * Uncorrelated Scalar Subqueries. All cases are supported.
> * Correlated Scalar Subqueries. We only allow subqueries that are aggregated 
> and use equality predicates.
> * Predicate Subqueries. IN or Exists type of queries. We allow most 
> predicates, except when they are pulled from under an Aggregate or Window 
> operator. In that case we only support equality predicates.
> However this does not cover the full range of possible subqueries. This, in 
> part, has to do with the fact that we currently rewrite all correlated 
> subqueries into a (LEFT/LEFT SEMI/LEFT ANTI) join.
> We currently lack supports for the following use cases:
> * The use of predicate subqueries in a projection.
> * The use of non-equality predicates below Aggregates and or Window operators.
> * The use of non-Aggregate subqueries for correlated scalar subqueries.
> This JIRA aims to lift these current limitations in subquery processing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15690) Fast single-node (single-process) in-memory shuffle

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15690:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Fast single-node (single-process) in-memory shuffle
> ---
>
> Key: SPARK-15690
> URL: https://issues.apache.org/jira/browse/SPARK-15690
> Project: Spark
>  Issue Type: New Feature
>  Components: Shuffle, SQL
>Reporter: Reynold Xin
>
> Spark's current shuffle implementation sorts all intermediate data by their 
> partition id, and then write the data to disk. This is not a big bottleneck 
> because the network throughput on commodity clusters tend to be low. However, 
> an increasing number of Spark users are using the system to process data on a 
> single-node. When in a single node operating against intermediate data that 
> fits in memory, the existing shuffle code path can become a big bottleneck.
> The goal of this ticket is to change Spark so it can use in-memory radix sort 
> to do data shuffling on a single node, and still gracefully fallback to disk 
> if the data size does not fit in memory. Given the number of partitions is 
> usually small (say less than 256), it'd require only a single pass do to the 
> radix sort with pretty decent CPU efficiency.
> Note that there have been many in-memory shuffle attempts in the past. This 
> ticket has a smaller scope (single-process), and aims to actually 
> productionize this code.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15689) Data source API v2

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15689:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Data source API v2
> --
>
> Key: SPARK-15689
> URL: https://issues.apache.org/jira/browse/SPARK-15689
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>  Labels: releasenotes
>
> This ticket tracks progress in creating the v2 of data source API. This new 
> API should focus on:
> 1. Have a small surface so it is easy to freeze and maintain compatibility 
> for a long time. Ideally, this API should survive architectural rewrites and 
> user-facing API revamps of Spark.
> 2. Have a well-defined column batch interface for high performance. 
> Convenience methods should exist to convert row-oriented formats into column 
> batches for data source developers.
> 3. Still support filter push down, similar to the existing API.
> 4. Nice-to-have: support additional common operators, including limit and 
> sampling.
> Note that both 1 and 2 are problems that the current data source API (v1) 
> suffers. The current data source API has a wide surface with dependency on 
> DataFrame/SQLContext, making the data source API compatibility depending on 
> the upper level API. The current data source API is also only row oriented 
> and has to go through an expensive external data type conversion to internal 
> data type.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13184) Support minPartitions parameter for JSON and CSV datasources as options

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-13184:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Support minPartitions parameter for JSON and CSV datasources as options
> ---
>
> Key: SPARK-13184
> URL: https://issues.apache.org/jira/browse/SPARK-13184
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> After looking through the pull requests below at Spark CSV datasources,
> https://github.com/databricks/spark-csv/pull/256
> https://github.com/databricks/spark-csv/issues/141
> https://github.com/databricks/spark-csv/pull/186
> It looks Spark might need to be able to set {{minPartitions}}.
> {{repartition()}} or {{coalesce()}} can be alternatives but it looks it needs 
> to shuffle the data for most cases.
> Although I am still not sure if it needs this, I will open this ticket just 
> for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13682) Finalize the public API for FileFormat

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-13682:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Finalize the public API for FileFormat
> --
>
> Key: SPARK-13682
> URL: https://issues.apache.org/jira/browse/SPARK-13682
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Michael Armbrust
>
> The current file format interface needs to be cleaned up before its 
> acceptable for public consumption:
>  - Have a version that takes Row and does a conversion, hide the internal API.
>  - Remove bucketing
>  - Remove RDD and the broadcastedConf
>  - Remove SQLContext (maybe include SparkSession?)
>  - Pass a better conf object



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9221) Support IntervalType in Range Frame

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-9221:

Target Version/s: 2.3.0  (was: 2.2.0)

> Support IntervalType in Range Frame
> ---
>
> Key: SPARK-9221
> URL: https://issues.apache.org/jira/browse/SPARK-9221
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Herman van Hovell
>
> Support the IntervalType in window range frames, as mentioned in the 
> conclusion of the databricks  blog 
> [post|https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html]
>  on window functions.
> This actualy requires us to support Literals instead of Integer constants in 
> Range Frames. The following things will have to be modified:
> * org.apache.spark.sql.hive.HiveQl
> * org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame
> * org.apache.spark.sql.execution.Window
> * org.apache.spark.sql.expressions.Window



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20319) Already quoted identifiers are getting wrapped with additional quotes

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20319:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Already quoted identifiers are getting wrapped with additional quotes
> -
>
> Key: SPARK-20319
> URL: https://issues.apache.org/jira/browse/SPARK-20319
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Umesh Chaudhary
>
> The issue was caused by 
> [SPARK-16387|https://issues.apache.org/jira/browse/SPARK-16387] where 
> reserved SQL words are honored by wrapping quotes on column names. 
> In our test we found that when quotes are explicitly wrapped in column names 
> then Oracle JDBC driver is throwing : 
> java.sql.BatchUpdateException: ORA-01741: illegal zero-length identifier 
> at 
> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:12296)
>  
> at 
> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:246)
>  
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:597)
>  
> and Cassandra JDBC driver is throwing : 
> 17/04/12 19:03:48 ERROR executor.Executor: Exception in task 0.0 in stage 5.0 
> (TID 6)
> java.sql.SQLSyntaxErrorException: [FMWGEN][Cassandra JDBC 
> Driver][Cassandra]syntax error or access rule violation: base table or view 
> not found: 
>   at weblogic.jdbc.cassandrabase.ddcl.b(Unknown Source)
>   at weblogic.jdbc.cassandrabase.ddt.a(Unknown Source)
>   at weblogic.jdbc.cassandrabase.BaseConnection.prepareStatement(Unknown 
> Source)
>   at weblogic.jdbc.cassandrabase.BaseConnection.prepareStatement(Unknown 
> Source)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.insertStatement(JdbcUtils.scala:118)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:571)
> CC: [~rxin] , [~joshrosen]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9576) DataFrame API improvement umbrella ticket (in Spark 2.x)

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-9576:

Target Version/s: 2.3.0  (was: 2.2.0)

> DataFrame API improvement umbrella ticket (in Spark 2.x)
> 
>
> Key: SPARK-9576
> URL: https://issues.apache.org/jira/browse/SPARK-9576
> Project: Spark
>  Issue Type: Umbrella
>  Components: SQL
>Reporter: Reynold Xin
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18394:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] 
> columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = 
> (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
> accessor3 = new 
> org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
> accessor4 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
> accessor5 = new 
> org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
> 

[jira] [Updated] (SPARK-18891) Support for specific collection types

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18891:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Support for specific collection types
> -
>
> Key: SPARK-18891
> URL: https://issues.apache.org/jira/browse/SPARK-18891
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.3, 2.1.0
>Reporter: Michael Armbrust
>Priority: Critical
>
> Encoders treat all collections the same (i.e. {{Seq}} vs {{List}}) which 
> force users to only define classes with the most generic type.
> An [example 
> error|https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/2398463439880241/2840265927289860/latest.html]:
> {code}
> case class SpecificCollection(aList: List[Int])
> Seq(SpecificCollection(1 :: Nil)).toDS().collect()
> {code}
> {code}
> java.lang.RuntimeException: Error while decoding: 
> java.util.concurrent.ExecutionException: java.lang.Exception: failed to 
> compile: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 98, Column 120: No applicable constructor/method found 
> for actual parameters "scala.collection.Seq"; candidates are: 
> "line29e7e4b1e36445baa3505b2e102aa86b29.$read$$iw$$iw$$iw$$iw$SpecificCollection(scala.collection.immutable.List)"
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-14543) SQL/Hive insertInto has unexpected results

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-14543:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> SQL/Hive insertInto has unexpected results
> --
>
> Key: SPARK-14543
> URL: https://issues.apache.org/jira/browse/SPARK-14543
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Ryan Blue
>Assignee: Ryan Blue
>
> *Updated description*
> There should be an option to match input data to output columns by name. The 
> API allows operations on tables, which hide the column resolution problem. 
> It's easy to copy from one table to another without listing the columns, and 
> in the API it is common to work with columns by name rather than by position. 
> I think the API should add a way to match columns by name, which is closer to 
> what users expect. I propose adding something like this:
> {code}
> CREATE TABLE src (id: bigint, count: int, total: bigint)
> CREATE TABLE dst (id: bigint, total: bigint, count: int)
> sqlContext.table("src").write.byName.insertInto("dst")
> {code}
> *Original description*
> The Hive write path adds a pre-insertion cast (projection) to reconcile 
> incoming data columns with the outgoing table schema. Columns are matched by 
> position and casts are inserted to reconcile the two column schemas.
> When columns aren't correctly aligned, this causes unexpected results. I ran 
> into this by not using a correct {{partitionBy}} call (addressed by 
> SPARK-14459), which caused an error message that an int could not be cast to 
> an array. However, if the columns are vaguely compatible, for example string 
> and float, then no error or warning is produced and data is written to the 
> wrong columns using unexpected casts (string -> bigint -> float).
> A real-world use case that will hit this is when a table definition changes 
> by adding a column in the middle of a table. Spark SQL statements that copied 
> from that table to a destination table will then map the columns differently 
> but insert casts that mask the problem. The last column's data will be 
> dropped without a reliable warning for the user.
> This highlights a few problems:
> * Too many or too few incoming data columns should cause an AnalysisException 
> to be thrown
> * Only "safe" casts should be inserted automatically, like int -> long, using 
> UpCast
> * Pre-insertion casts currently ignore extra columns by using zip
> * The pre-insertion cast logic differs between Hive's MetastoreRelation and 
> LogicalRelation
> Also, I think there should be an option to match input data to output columns 
> by name. The API allows operations on tables, which hide the column 
> resolution problem. It's easy to copy from one table to another without 
> listing the columns, and in the API it is common to work with columns by name 
> rather than by position. I think the API should add a way to match columns by 
> name, which is closer to what users expect. I propose adding something like 
> this:
> {code}
> CREATE TABLE src (id: bigint, count: int, total: bigint)
> CREATE TABLE dst (id: bigint, total: bigint, count: int)
> sqlContext.table("src").write.byName.insertInto("dst")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17556) Executor side broadcast for broadcast joins

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17556:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Executor side broadcast for broadcast joins
> ---
>
> Key: SPARK-17556
> URL: https://issues.apache.org/jira/browse/SPARK-17556
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core, SQL
>Reporter: Reynold Xin
> Attachments: executor broadcast.pdf, executor-side-broadcast.pdf
>
>
> Currently in Spark SQL, in order to perform a broadcast join, the driver must 
> collect the result of an RDD and then broadcast it. This introduces some 
> extra latency. It might be possible to broadcast directly from executors.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15694) Implement ScriptTransformation in sql/core

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15694:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Implement ScriptTransformation in sql/core
> --
>
> Key: SPARK-15694
> URL: https://issues.apache.org/jira/browse/SPARK-15694
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> ScriptTransformation currently relies on Hive internals. It'd be great if we 
> can implement a native ScriptTransformation in sql/core module to remove the 
> extra Hive dependency here.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16026) Cost-based Optimizer framework

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-16026:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Cost-based Optimizer framework
> --
>
> Key: SPARK-16026
> URL: https://issues.apache.org/jira/browse/SPARK-16026
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>  Labels: releasenotes
> Attachments: Spark_CBO_Design_Spec.pdf
>
>
> This is an umbrella ticket to implement a cost-based optimizer framework 
> beyond broadcast join selection. This framework can be used to implement some 
> useful optimizations such as join reordering.
> The design should discuss how to break the work down into multiple, smaller 
> logical units. For example, changes to statistics class, system catalog, cost 
> estimation/propagation in expressions, cost estimation/propagation in 
> operators can be done in decoupled pull requests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18543) SaveAsTable(CTAS) using overwrite could change table definition

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18543:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> SaveAsTable(CTAS) using overwrite could change table definition
> ---
>
> Key: SPARK-18543
> URL: https://issues.apache.org/jira/browse/SPARK-18543
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2
>Reporter: Xiao Li
>Assignee: Xiao Li
>
> When the mode is OVERWRITE, we drop the Hive serde tables and create a data 
> source table. This is not right. 
> {code}
> val tableName = "tab1"
> withTable(tableName) {
>   sql(s"CREATE TABLE $tableName STORED AS SEQUENCEFILE AS SELECT 1 AS 
> key, 'abc' AS value")
>   val df = sql(s"SELECT key, value FROM $tableName")
>   df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
>   val tableMeta = 
> spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
>   assert(tableMeta.provider == 
> Some(spark.sessionState.conf.defaultDataSourceName))
> }
> {code}
> Based on the definition of OVERWRITE, no change should be made on the table 
> definition. When recreate the table, we need to create a Hive serde table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18950) Report conflicting fields when merging two StructTypes.

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18950:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Report conflicting fields when merging two StructTypes.
> ---
>
> Key: SPARK-18950
> URL: https://issues.apache.org/jira/browse/SPARK-18950
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Cheng Lian
>Priority: Minor
>  Labels: starter
>
> Currently, {{StructType.merge()}} only reports data types of conflicting 
> fields when merging two incompatible schemas. It would be nice to also report 
> the field names for easier debugging.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15117) Generate code that get a value in each compressed column from CachedBatch when DataFrame.cache() is called

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15117:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Generate code that get a value in each compressed column from CachedBatch 
> when DataFrame.cache() is called
> --
>
> Key: SPARK-15117
> URL: https://issues.apache.org/jira/browse/SPARK-15117
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Kazuaki Ishizaki
>
> Once SPARK-14098 is merged, we will migrate a feature in this JIRA entry.
> When DataFrame.cache() is called, data is stored as column-oriented storage 
> in CachedBatch. The current Catalyst generates Java program to get a value of 
> a column from an InternalRow that is translated from CachedBatch. This issue 
> generates Java code to get a value of a column from CachedBatch. This JIRA 
> entry supports other primitive types (boolean/byte/short/int/long) whose 
> column may be compressed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17626) TPC-DS performance improvements using star-schema heuristics

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17626:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> TPC-DS performance improvements using star-schema heuristics
> 
>
> Key: SPARK-17626
> URL: https://issues.apache.org/jira/browse/SPARK-17626
> Project: Spark
>  Issue Type: Umbrella
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Ioana Delaney
>Priority: Critical
> Attachments: StarSchemaJoinReordering.pptx
>
>
> *TPC-DS performance improvements using star-schema heuristics*
> \\
> \\
> TPC-DS consists of multiple snowflake schema, which are multiple star schema 
> with dimensions linking to dimensions. A star schema consists of a fact table 
> referencing a number of dimension tables. Fact table holds the main data 
> about a business. Dimension table, a usually smaller table, describes data 
> reflecting the dimension/attribute of a business.
> \\
> \\
> As part of the benchmark performance investigation, we observed a pattern of 
> sub-optimal execution plans of large fact tables joins. Manual rewrite of 
> some of the queries into selective fact-dimensions joins resulted in 
> significant performance improvement. This prompted us to develop a simple 
> join reordering algorithm based on star schema detection. The performance 
> testing using *1TB TPC-DS workload* shows an overall improvement of *19%*. 
> \\
> \\
> *Summary of the results:*
> {code}
> Passed 99
> Failed  0
> Total q time (s)   14,962
> Max time1,467
> Min time3
> Mean time 145
> Geomean44
> {code}
> *Compared to baseline* (Negative = improvement; Positive = Degradation):
> {code}
> End to end improved (%)  -19% 
> Mean time improved (%)   -19%
> Geomean improved (%) -24%
> End to end improved (seconds)  -3,603
> Number of queries improved (>10%)  45
> Number of queries degraded (>10%)   6
> Number of queries unchanged48
> Top 10 queries improved (%)  -20%
> {code}
> Cluster: 20-node cluster with each node having:
> * 10 2TB hard disks in a JBOD configuration, 2 Intel(R) Xeon(R) CPU E5-2680 
> v2 @ 2.80GHz processors, 128 GB RAM, 10Gigabit Ethernet.
> * Total memory for the cluster: 2.5TB
> * Total storage: 400TB
> * Total CPU cores: 480
> Hadoop stack: IBM Open Platform with Apache Hadoop v4.2. Apache Spark 2.0 GA
> Database info:
> * Schema: TPCDS 
> * Scale factor: 1TB total space
> * Storage format: Parquet with Snappy compression
> Our investigation and results are included in the attached document.
> There are two parts to this improvement:
> # Join reordering using star schema detection
> # New selectivity hint to specify the selectivity of the predicates over base 
> tables. Selectivity hint is optional and it was not used in the above TPC-DS 
> tests. 
> \\



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15867) Use bucket files for TABLESAMPLE BUCKET

2017-06-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15867:
-
Target Version/s: 2.3.0  (was: 2.2.0)

> Use bucket files for TABLESAMPLE BUCKET
> ---
>
> Key: SPARK-15867
> URL: https://issues.apache.org/jira/browse/SPARK-15867
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.6.0, 2.0.0
>Reporter: Andrew Or
>
> {code}
> SELECT * FROM boxes TABLESAMPLE (BUCKET 3 OUT OF 16)
> {code}
> In Hive, this would select the 3rd bucket out of every 16 buckets there are 
> in the table. E.g. if the table was clustered by 32 buckets then this would 
> sample the 3rd and the 19th bucket. (See 
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Sampling)
> In Spark, however, we simply sample 3/16 of the number of input rows.
> Either we don't support it in Spark or do it in a way that's consistent with 
> Hive.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >