[jira] [Resolved] (SPARK-27664) Performance issue with FileStatusCache, while reading from object stores.

2019-06-12 Thread Prashant Sharma (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prashant Sharma resolved SPARK-27664.
-
Resolution: Won't Fix

I am marking it as won't fix, because, this is now difficult to reproduce. In 
version, 2.3.x the problem was more evident but after the merging of 
SPARK-23896, the version 2.4.x and above do not do a lot of re-listing for the 
general case. 
 But, problem of relisting still exists (i.e. the version 2.4.3 and current 
3.0.0 unreleased) and following code can be used to reproduce it.
{code:java}
// First create the object store data for testing 
spark.range(0,100, 1, 10).selectExpr("id", "id < 100 as 
p").write.partitionBy("p").save("")

// Then following commands would reproduce it.
// With times.
// 19/05/24 03:07:56
val s = s"""
|CREATE EXTERNAL TABLE test11(id bigint)
|PARTITIONED BY (p boolean)
|STORED AS parquet
|LOCATION ''""".stripMargin
spark.sql(s)

spark.sql("ALTER TABLE test11 add partition (p=true)")
spark.sql("ALTER TABLE test11 add partition (p=false)")
spark.sql("SELECT * FROM test11 where id <10").show()
// 19/05/24 03:50:43
spark.sql("SELECT * FROM test11 where id <100").show()
// 19/05/24 04:28:19


{code}
 As you can see above, the overall time taken is much more the time taken for 
an extra re-listing. So, the difference in performance is hard to notice. 
However, this issue along with the fix can be reconsidered later, if the 
problem resurfaces with larger impact.

> Performance issue with FileStatusCache, while reading from object stores.
> -
>
> Key: SPARK-27664
> URL: https://issues.apache.org/jira/browse/SPARK-27664
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 2.4.3
>Reporter: Prashant Sharma
>Priority: Major
>
> In short,
> This issue(i.e. degraded performance ) surfaces when the number of files are 
> large > 100K, and is stored on an object store, or any remote storage. The 
> actual issue is due to,
> Everything is inserted as a single entry in the FileStatusCache i.e. guava 
> cache, which does not fit unless the cache is configured to be very very 
> large or 4X. Reason: [https://github.com/google/guava/issues/3462].
>  
> Full story, with possible solutions,
> When we read a directory in spark by,
> {code:java}
> spark.read.parquet("/dir/data/test").limit(1).show()
> {code}
> behind the scenes, it fetches the FileStatus objects and caches them, inside 
> a FileStatusCache, so that it does not need to refetch these objects. 
> Internally, it scans using listLeafFiles function at driver. 
>  Inside the cache, the entire content of the listing as array of FileStatus 
> objects is inserted as a single entry, with key as "/dir/data/test", in the 
> FileStatusCache. The default size of this cache is 250MB and it is 
> configurable. This underlying cache uses guava cache.
> The guava cache has one interesting property, i.e. a single entry can only be 
> as large as
> {code:java}
> maximumSize/concurrencyLevel{code}
> see [https://github.com/google/guava/issues/3462], for details. So for a 
> cache size of 250MB, a single entry can be as large as only 250MB/4, since 
> the default concurrency level is 4 in guava. This size is around 62MB, which 
> is good enough for most datasets, but for directories with larger listing, it 
> does not work that well. And the effect of this is especially evident when 
> such listings are for object stores like Amazon s3 or IBM Cloud object store 
> etc..
> So, currently one can work around this problem by setting the value of size 
> of the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very 
> high, as it needs to be much more than 4x of what is required. But this has a 
> drawback, that either one has to start the driver with large amount of memory 
> than required or risk an OOM when cache does not evict older entries as the 
> size is configured to be 4x.
> In order to fix this issue, we can take 3 different approaches,
> 1) one stop gap fix can be, reduce the concurrency level of the guava cache 
> to be just 1, for few entries with very large size, we do not lose much by 
> doing this.
> 2) The alternative would be, to divide the input array into multiple entries 
> in the cache, instead of inserting everything against a single key. This can 
> be done using directories as keys, if there are multiple nested directories 
> under a directory, but if a user has everything listed under a single dir, 
> then this solution does not help either and we cannot depend on them creating 
> partitions in their hive/sql table.
> 3) One more alternative fix would be, to make concurrency level configurable, 
> for those who want to change it. And while inserting the entry in the cache 
> divide it into 

[jira] [Created] (SPARK-28032) DataFrame.saveAsTable( in AVRO format with Timestamps create bad Hive tables

2019-06-12 Thread Mathew Wicks (JIRA)
Mathew Wicks created SPARK-28032:


 Summary: DataFrame.saveAsTable( in AVRO format with Timestamps 
create bad Hive tables
 Key: SPARK-28032
 URL: https://issues.apache.org/jira/browse/SPARK-28032
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.3
 Environment: Spark 2.4.3

Hive 1.1.0
Reporter: Mathew Wicks


I am not sure if it's my very old version of Hive (1.1.0), but when I use the 
following code, I end up with a table which Spark can read, but Hive cannot.

That is to say, when writing AVRO format tables, they cannot be read in Hive if 
they contain timestamp types.

*Hive error:*
{code:java}
Error while compiling statement: FAILED: UnsupportedOperationException 
timestamp is not supported.
{code}
*Spark Code:*
{code:java}
import java.sql.Timestamp
import spark.implicits._

val currentTime = new Timestamp(System.currentTimeMillis())
 
val df = Seq(
 (currentTime)
).toDF()

df.write.mode("overwrite").format("avro").saveAsTable("database.table_name")
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27930) List all built-in UDFs have different names

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-27930:

Summary: List all built-in UDFs have different names  (was: Add built-in 
Math Function: RANDOM)

> List all built-in UDFs have different names
> ---
>
> Key: SPARK-27930
> URL: https://issues.apache.org/jira/browse/SPARK-27930
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> This ticket list all built-in UDFs have different names:
> |PostgreSQL|Spark SQL|
> |random|rand|
> |format|format_string|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27930) Add built-in Math Function: RANDOM

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-27930:

Description: 
This ticket list all built-in UDFs have different names:
|PostgreSQL|Spark SQL|
|random|rand|
|format|format_string|

  was:
The RANDOM function generates a random value between 0.0 and 1.0. Syntax:
{code:sql}
RANDOM()
{code}

More details:
https://www.postgresql.org/docs/12/functions-math.html

Other DBs:
https://docs.aws.amazon.com/redshift/latest/dg/r_RANDOM.html
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Functions/Mathematical/RANDOM.htm?tocpath=SQL%20Reference%20Manual%7CSQL%20Functions%7CMathematical%20Functions%7C_24



> Add built-in Math Function: RANDOM
> --
>
> Key: SPARK-27930
> URL: https://issues.apache.org/jira/browse/SPARK-27930
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> This ticket list all built-in UDFs have different names:
> |PostgreSQL|Spark SQL|
> |random|rand|
> |format|format_string|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-27930) List all built-in UDFs have different names

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-27930:

Comment: was deleted

(was: Workaround:
{code:sql}
select rand()
{code}
{code:sql}
select reflect("java.lang.Math", "random")
{code})

> List all built-in UDFs have different names
> ---
>
> Key: SPARK-27930
> URL: https://issues.apache.org/jira/browse/SPARK-27930
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> This ticket list all built-in UDFs have different names:
> |PostgreSQL|Spark SQL|
> |random|rand|
> |format|format_string|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28024) Incorrect numeric values when out of range

2019-06-12 Thread Yuming Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862712#comment-16862712
 ] 

Yuming Wang commented on SPARK-28024:
-

JDK also has this issue: !SPARK-28024.png! 

> Incorrect numeric values when out of range
> --
>
> Key: SPARK-28024
> URL: https://issues.apache.org/jira/browse/SPARK-28024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
> Attachments: SPARK-28024.png
>
>
> For example:
> {code:sql}
> select tinyint(128) * tinyint(2); -- 0
> select smallint(2147483647) * smallint(2); -- -2
> select int(2147483647) * int(2); -- -2
> SELECT smallint((-32768)) * smallint(-1); -- -32768
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28024) Incorrect numeric values when out of range

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-28024:

Attachment: SPARK-28024.png

> Incorrect numeric values when out of range
> --
>
> Key: SPARK-28024
> URL: https://issues.apache.org/jira/browse/SPARK-28024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
> Attachments: SPARK-28024.png
>
>
> For example:
> {code:sql}
> select tinyint(128) * tinyint(2); -- 0
> select smallint(2147483647) * smallint(2); -- -2
> select int(2147483647) * int(2); -- -2
> SELECT smallint((-32768)) * smallint(-1); -- -32768
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28008) Default values & column comments in AVRO schema converters

2019-06-12 Thread Mathew Wicks (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862699#comment-16862699
 ] 

Mathew Wicks commented on SPARK-28008:
--

The only issue I could think, would be that the column comments aren't saved. 
(Which some users might want)

 

While I agree it doesn't seem like the api should be public, it is useful to 
know what schema a dataframe will be written with. (Some spark type have to be 
converted for avro). Also, the user might want to make changes and then use the 
"avroSchema" writer option, for example, writing timestamps in 
"timestamp-milis" type rather than "timestamp-micro".

 

Beyond that, is there really harm in having a more correct conversion from the 
StructType into AVRO Schema?

> Default values & column comments in AVRO schema converters
> --
>
> Key: SPARK-28008
> URL: https://issues.apache.org/jira/browse/SPARK-28008
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Mathew Wicks
>Priority: Major
>
> Currently in both `toAvroType` and `toSqlType` 
> [SchemaConverters.scala#L134|https://github.com/apache/spark/blob/branch-2.4/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L134]
>  there are two behaviours which are unexpected.
> h2. Nullable fields in spark are converted to UNION[TYPE, NULL] and no 
> default value is set:
> *Current Behaviour:*
> {code:java}
> import org.apache.spark.sql.avro.SchemaConverters
> import org.apache.spark.sql.types._
> val schema = new StructType().add("a", "string", nullable = true)
> val avroSchema = SchemaConverters.toAvroType(schema)
> println(avroSchema.toString(true))
> {
>   "type" : "record",
>   "name" : "topLevelRecord",
>   "fields" : [ {
> "name" : "a",
> "type" : [ "string", "null" ]
>   } ]
> }
> {code}
> *Expected Behaviour:*
> (NOTE: The reversal of "null" & "string" in the union, needed for a default 
> value of null)
> {code:java}
> import org.apache.spark.sql.avro.SchemaConverters
> import org.apache.spark.sql.types._
> val schema = new StructType().add("a", "string", nullable = true)
> val avroSchema = SchemaConverters.toAvroType(schema)
> println(avroSchema.toString(true))
> {
>   "type" : "record",
>   "name" : "topLevelRecord",
>   "fields" : [ {
> "name" : "a",
> "type" : [ "null", "string" ],
> "default" : null
>   } ]
> }{code}
> h2. Field comments/metadata is not propagated:
> *Current Behaviour:*
> {code:java}
> import org.apache.spark.sql.avro.SchemaConverters
> import org.apache.spark.sql.types._
> val schema = new StructType().add("a", "string", nullable=false, 
> comment="AAA")
> val avroSchema = SchemaConverters.toAvroType(schema)
> println(avroSchema.toString(true))
> {
>   "type" : "record",
>   "name" : "topLevelRecord",
>   "fields" : [ {
> "name" : "a",
> "type" : "string"
>   } ]
> }{code}
> *Expected Behaviour:*
> {code:java}
> import org.apache.spark.sql.avro.SchemaConverters
> import org.apache.spark.sql.types._
> val schema = new StructType().add("a", "string", nullable=false, 
> comment="AAA")
> val avroSchema = SchemaConverters.toAvroType(schema)
> println(avroSchema.toString(true))
> {
>   "type" : "record",
>   "name" : "topLevelRecord",
>   "fields" : [ {
> "name" : "a",
> "type" : "string",
> "doc" : "AAA"
>   } ]
> }{code}
>  
> The behaviour should be similar (but the reverse) for `toSqlType`.
> I think we should aim to get this in before 3.0, as it will probably be a 
> breaking change for some usage of the AVRO API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27290) remove unneed sort under Aggregate

2019-06-12 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862698#comment-16862698
 ] 

Xiaoju Wu commented on SPARK-27290:
---

[~joshrosen] Got it. I think we should identify in which patterns sort is 
really needed and fix the UT to be more meaningful.

> remove unneed sort under Aggregate
> --
>
> Key: SPARK-27290
> URL: https://issues.apache.org/jira/browse/SPARK-27290
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> I saw some tickets to remove unneeded sort in plan while I think there's 
> another case in which sort is redundant:
> Sort just under an non-orderPreserving node is redundant, for example:
> {code}
> select count(*) from (select a1 from A order by a2);
> +- Aggregate
>   +- Sort
>      +- FileScan parquet
> {code}
> But one of the existing test cases is conflict with this example:
> {code}
> test("sort should not be removed when there is a node which doesn't guarantee 
> any order") {
>    val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc)   
>val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)
>    val optimized = Optimize.execute(groupedAndResorted.analyze)
>    val correctAnswer = groupedAndResorted.analyze
>comparePlans(optimized, correctAnswer) 
> }
> {code}
> Why is it designed like this? In my opinion, since Aggregate won't pass up 
> the ordering, the below Sort is useless.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs

2019-06-12 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-28016.
--
Resolution: Cannot Reproduce

> Spark hangs when an execution plan has many projections on nested structs
> -
>
> Key: SPARK-28016
> URL: https://issues.apache.org/jira/browse/SPARK-28016
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.4.3
> Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>Reporter: Ruslan Yushchenko
>Priority: Major
> Attachments: NestedOps.scala, SparkApp1Issue.scala, 
> SparkApp2Workaround.scala, spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other 
> demonstrates a workaround. Also, an archive is attached where these jobs are 
> packages as a Maven Project.
> To reproduce the attached Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * A nested withColumn map transformation is used to apply a transformation 
> on a struct field and create a new struct field. The code for this 
> transformation is also attached. 
>  * Once more than 11 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
>   // A sample data for a dataframe with nested structs
>   val sample =
> """
>   |{
>   |  "strings": {
>   |"simple": "Culpa repellat nesciunt accusantium",
>   |"all_random": "DESebo8d%fL9sX@AzVin",
>   |"whitespaces": "qbbl"
>   |  },
>   |  "numerics": {
>   |"small_positive": 722,
>   |"small_negative": -660,
>   |"big_positive": 669223368251997,
>   |"big_negative": -161176863305841,
>   |"zero": 0
>   |  }
>   |}
> """.stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Accusamus quia vel deleniti",
> |"all_random": "rY*KS]jPBpa[",
> |"whitespaces": "  t e   t   rp   z p"
> |  },
> |  "numerics": {
> |"small_positive": 268,
> |"small_negative": -134,
> |"big_positive": 768990048149640,
> |"big_negative": -684718954884696,
> |"zero": 0
> |  }
> |}
> |""".stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Quia numquam deserunt delectus rem est",
> |"all_random": "GmRdQlE4Avn1hSlVPAH",
> |"whitespaces": "   c   sayv   drf "
> |  },
> |  "numerics": {
> |"small_positive": 909,
> |"small_negative": -363,
> |"big_positive": 592517494751902,
> |"big_negative": -703224505589638,
> |"zero": 0
> |  }
> |}
> |""".stripMargin :: Nil
>   /**
> * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
> * involving nested structs in an execution plan.
> *
> * The example works as follows:
> * - A small dataframe is created from a JSON example above
> * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
> *   a new struct field.
> * - Once more than 11 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
> *   the execution plan
> */
>   def main(args: Array[String]): Unit = {
> val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue")
> val spark = sparkBuilder
>   .master("local[4]")
>   .getOrCreate()
> import spark.implicits._
> import za.co.absa.spark.utils.NestedOps.DataSetWrapper
> val df = spark.read.json(sample.toDS)
> // Apply several uppercase and negation transformations
> val dfOutput = df
>   .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => 
> upper(c))
>   .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => 
> upper(c))
>   .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => 
> upper(c))
>   .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => 
> -c)
>   .nestedWithColumnMap("numerics.small_negative", "numerics.num2", c => 
> -c)
>   .nestedWithColumnMap("numerics.big_positive", "numerics.num3", c => -c)
>   

[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs

2019-06-12 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862693#comment-16862693
 ] 

Hyukjin Kwon commented on SPARK-28016:
--

Cannot reproduce in the current master.

{code}
scala> dfOutput.show
19/06/13 12:21:08 WARN package: Truncated the string representation of a plan 
since it was too large. This behavior can be adjusted by setting 
'spark.sql.debug.maxToStringFields'.
+++
|numerics| strings|
+++
|[-161176863305841...|[DESebo8d%fL9sX@A...|
|[-684718954884696...|[rY*KS]jP...|
|[-703224505589638...|[GmRdQlE4Avn1hSlV...|
+++
{code}

> Spark hangs when an execution plan has many projections on nested structs
> -
>
> Key: SPARK-28016
> URL: https://issues.apache.org/jira/browse/SPARK-28016
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.4.3
> Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>Reporter: Ruslan Yushchenko
>Priority: Major
> Attachments: NestedOps.scala, SparkApp1Issue.scala, 
> SparkApp2Workaround.scala, spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other 
> demonstrates a workaround. Also, an archive is attached where these jobs are 
> packages as a Maven Project.
> To reproduce the attached Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * A nested withColumn map transformation is used to apply a transformation 
> on a struct field and create a new struct field. The code for this 
> transformation is also attached. 
>  * Once more than 11 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
>   // A sample data for a dataframe with nested structs
>   val sample =
> """
>   |{
>   |  "strings": {
>   |"simple": "Culpa repellat nesciunt accusantium",
>   |"all_random": "DESebo8d%fL9sX@AzVin",
>   |"whitespaces": "qbbl"
>   |  },
>   |  "numerics": {
>   |"small_positive": 722,
>   |"small_negative": -660,
>   |"big_positive": 669223368251997,
>   |"big_negative": -161176863305841,
>   |"zero": 0
>   |  }
>   |}
> """.stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Accusamus quia vel deleniti",
> |"all_random": "rY*KS]jPBpa[",
> |"whitespaces": "  t e   t   rp   z p"
> |  },
> |  "numerics": {
> |"small_positive": 268,
> |"small_negative": -134,
> |"big_positive": 768990048149640,
> |"big_negative": -684718954884696,
> |"zero": 0
> |  }
> |}
> |""".stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Quia numquam deserunt delectus rem est",
> |"all_random": "GmRdQlE4Avn1hSlVPAH",
> |"whitespaces": "   c   sayv   drf "
> |  },
> |  "numerics": {
> |"small_positive": 909,
> |"small_negative": -363,
> |"big_positive": 592517494751902,
> |"big_negative": -703224505589638,
> |"zero": 0
> |  }
> |}
> |""".stripMargin :: Nil
>   /**
> * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
> * involving nested structs in an execution plan.
> *
> * The example works as follows:
> * - A small dataframe is created from a JSON example above
> * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
> *   a new struct field.
> * - Once more than 11 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
> *   the execution plan
> */
>   def main(args: Array[String]): Unit = {
> val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue")
> val spark = sparkBuilder
>   .master("local[4]")
>   .getOrCreate()
> import spark.implicits._
> import za.co.absa.spark.utils.NestedOps.DataSetWrapper
> val df = spark.read.json(sample.toDS)
> // Apply several uppercase and negation 

[jira] [Commented] (SPARK-28008) Default values & column comments in AVRO schema converters

2019-06-12 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862691#comment-16862691
 ] 

Hyukjin Kwon commented on SPARK-28008:
--

https://github.com/apache/spark/blob/branch-2.4/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L134
 is not meant to be public API so consistency won't matter there. Does this 
cause any actual issue?

> Default values & column comments in AVRO schema converters
> --
>
> Key: SPARK-28008
> URL: https://issues.apache.org/jira/browse/SPARK-28008
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Mathew Wicks
>Priority: Major
>
> Currently in both `toAvroType` and `toSqlType` 
> [SchemaConverters.scala#L134|https://github.com/apache/spark/blob/branch-2.4/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L134]
>  there are two behaviours which are unexpected.
> h2. Nullable fields in spark are converted to UNION[TYPE, NULL] and no 
> default value is set:
> *Current Behaviour:*
> {code:java}
> import org.apache.spark.sql.avro.SchemaConverters
> import org.apache.spark.sql.types._
> val schema = new StructType().add("a", "string", nullable = true)
> val avroSchema = SchemaConverters.toAvroType(schema)
> println(avroSchema.toString(true))
> {
>   "type" : "record",
>   "name" : "topLevelRecord",
>   "fields" : [ {
> "name" : "a",
> "type" : [ "string", "null" ]
>   } ]
> }
> {code}
> *Expected Behaviour:*
> (NOTE: The reversal of "null" & "string" in the union, needed for a default 
> value of null)
> {code:java}
> import org.apache.spark.sql.avro.SchemaConverters
> import org.apache.spark.sql.types._
> val schema = new StructType().add("a", "string", nullable = true)
> val avroSchema = SchemaConverters.toAvroType(schema)
> println(avroSchema.toString(true))
> {
>   "type" : "record",
>   "name" : "topLevelRecord",
>   "fields" : [ {
> "name" : "a",
> "type" : [ "null", "string" ],
> "default" : null
>   } ]
> }{code}
> h2. Field comments/metadata is not propagated:
> *Current Behaviour:*
> {code:java}
> import org.apache.spark.sql.avro.SchemaConverters
> import org.apache.spark.sql.types._
> val schema = new StructType().add("a", "string", nullable=false, 
> comment="AAA")
> val avroSchema = SchemaConverters.toAvroType(schema)
> println(avroSchema.toString(true))
> {
>   "type" : "record",
>   "name" : "topLevelRecord",
>   "fields" : [ {
> "name" : "a",
> "type" : "string"
>   } ]
> }{code}
> *Expected Behaviour:*
> {code:java}
> import org.apache.spark.sql.avro.SchemaConverters
> import org.apache.spark.sql.types._
> val schema = new StructType().add("a", "string", nullable=false, 
> comment="AAA")
> val avroSchema = SchemaConverters.toAvroType(schema)
> println(avroSchema.toString(true))
> {
>   "type" : "record",
>   "name" : "topLevelRecord",
>   "fields" : [ {
> "name" : "a",
> "type" : "string",
> "doc" : "AAA"
>   } ]
> }{code}
>  
> The behaviour should be similar (but the reverse) for `toSqlType`.
> I think we should aim to get this in before 3.0, as it will probably be a 
> breaking change for some usage of the AVRO API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28009) PipedRDD: Block not locked for reading failure

2019-06-12 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-28009.
--
Resolution: Duplicate

> PipedRDD: Block not locked for reading failure
> --
>
> Key: SPARK-28009
> URL: https://issues.apache.org/jira/browse/SPARK-28009
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
> Environment: Running in a Docker container with Spark 2.4.0 on Linux 
> kernel 4.9.0
>Reporter: Douglas Colkitt
>Priority: Major
>
> PipedRDD operation fails with the below stack trace. Failure primarily occurs 
> when the STDOUT from the Unix process is small and the STDIN into the Unix 
> process is comparatively much larger.
> Given the similarity to SPARK-18406, this seems to be due to a race condition 
> when it comes to accessing the block's reader locker. The PipedRDD class 
> implementation spawns STDIN iterator in a separate thread, so that would 
> corroborate the race condition hypothesis.
> {code}
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
> at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:842)
> at 
> org.apache.spark.storage.BlockManager.releaseLockAndDispose(BlockManager.scala:1610)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$2.apply$mcV$sp(BlockManager.scala:621)
> at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at org.apache.spark.rdd.PipedRDD$$anon$3.run(PipedRDD.scala:145)
> Suppressed: java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:156)
> at 
> org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
> at 
> org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
> at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:363)
> at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
> at scala.Option.foreach(Option.scala:257)
> at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:362)
> at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:358)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at 
> org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:358)
> at 
> org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:858)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$1.apply$mcV$sp(Executor.scala:409)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1369)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28009) PipedRDD: Block not locked for reading failure

2019-06-12 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-28009:
-
Description: 
PipedRDD operation fails with the below stack trace. Failure primarily occurs 
when the STDOUT from the Unix process is small and the STDIN into the Unix 
process is comparatively much larger.

Given the similarity to SPARK-18406, this seems to be due to a race condition 
when it comes to accessing the block's reader locker. The PipedRDD class 
implementation spawns STDIN iterator in a separate thread, so that would 
corroborate the race condition hypothesis.

{code}
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:842)
at 
org.apache.spark.storage.BlockManager.releaseLockAndDispose(BlockManager.scala:1610)
at 
org.apache.spark.storage.BlockManager$$anonfun$2.apply$mcV$sp(BlockManager.scala:621)
at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.PipedRDD$$anon$3.run(PipedRDD.scala:145)
Suppressed: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:363)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:362)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:358)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 
org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:358)
at 
org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:858)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$1.apply$mcV$sp(Executor.scala:409)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1369)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}


  was:
PipedRDD operation fails with the below stack trace. Failure primarily occurs 
when the STDOUT from the Unix process is small and the STDIN into the Unix 
process is comparatively much larger.

 

Given the similarity to SPARK-18406, this seems to be due to a race condition 
when it comes to accessing the block's reader locker. The PipedRDD class 
implementation spawns STDIN iterator in a separate thread, so that would 
corroborate the race condition hypothesis.

 

at scala.Predef$.assert(Predef.scala:170)

at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)

at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:842)

at 
org.apache.spark.storage.BlockManager.releaseLockAndDispose(BlockManager.scala:1610)

at 
org.apache.spark.storage.BlockManager$$anonfun$2.apply$mcV$sp(BlockManager.scala:621)

at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)

at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)

at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at org.apache.spark.rdd.PipedRDD$$anon$3.run(PipedRDD.scala:145)

Suppressed: java.lang.AssertionError: assertion failed

at scala.Predef$.assert(Predef.scala:156)

at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)

at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)

at 

[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs

2019-06-12 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862682#comment-16862682
 ] 

Hyukjin Kwon commented on SPARK-28016:
--

Can you narrow down and make a self-contained reproducer without such codes 
like {{DataSetWrapper}}? 

> Spark hangs when an execution plan has many projections on nested structs
> -
>
> Key: SPARK-28016
> URL: https://issues.apache.org/jira/browse/SPARK-28016
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.4.3
> Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>Reporter: Ruslan Yushchenko
>Priority: Major
> Attachments: NestedOps.scala, SparkApp1Issue.scala, 
> SparkApp2Workaround.scala, spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other 
> demonstrates a workaround. Also, an archive is attached where these jobs are 
> packages as a Maven Project.
> To reproduce the attached Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * A nested withColumn map transformation is used to apply a transformation 
> on a struct field and create a new struct field. The code for this 
> transformation is also attached. 
>  * Once more than 11 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
>   // A sample data for a dataframe with nested structs
>   val sample =
> """
>   |{
>   |  "strings": {
>   |"simple": "Culpa repellat nesciunt accusantium",
>   |"all_random": "DESebo8d%fL9sX@AzVin",
>   |"whitespaces": "qbbl"
>   |  },
>   |  "numerics": {
>   |"small_positive": 722,
>   |"small_negative": -660,
>   |"big_positive": 669223368251997,
>   |"big_negative": -161176863305841,
>   |"zero": 0
>   |  }
>   |}
> """.stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Accusamus quia vel deleniti",
> |"all_random": "rY*KS]jPBpa[",
> |"whitespaces": "  t e   t   rp   z p"
> |  },
> |  "numerics": {
> |"small_positive": 268,
> |"small_negative": -134,
> |"big_positive": 768990048149640,
> |"big_negative": -684718954884696,
> |"zero": 0
> |  }
> |}
> |""".stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Quia numquam deserunt delectus rem est",
> |"all_random": "GmRdQlE4Avn1hSlVPAH",
> |"whitespaces": "   c   sayv   drf "
> |  },
> |  "numerics": {
> |"small_positive": 909,
> |"small_negative": -363,
> |"big_positive": 592517494751902,
> |"big_negative": -703224505589638,
> |"zero": 0
> |  }
> |}
> |""".stripMargin :: Nil
>   /**
> * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
> * involving nested structs in an execution plan.
> *
> * The example works as follows:
> * - A small dataframe is created from a JSON example above
> * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
> *   a new struct field.
> * - Once more than 11 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
> *   the execution plan
> */
>   def main(args: Array[String]): Unit = {
> val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue")
> val spark = sparkBuilder
>   .master("local[4]")
>   .getOrCreate()
> import spark.implicits._
> import za.co.absa.spark.utils.NestedOps.DataSetWrapper
> val df = spark.read.json(sample.toDS)
> // Apply several uppercase and negation transformations
> val dfOutput = df
>   .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => 
> upper(c))
>   .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => 
> upper(c))
>   .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => 
> upper(c))
>   .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => 
> -c)
>   .nestedWithColumnMap("numerics.small_negative", 

[jira] [Resolved] (SPARK-28021) A unappropriate exception in StaticMemoryManager.getMaxExecutionMemory

2019-06-12 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-28021.
--
Resolution: Invalid

Let's ask a question to mailing list before filing it as an issue. See 
https://spark.apache.org/community.html

> A unappropriate exception in StaticMemoryManager.getMaxExecutionMemory
> --
>
> Key: SPARK-28021
> URL: https://issues.apache.org/jira/browse/SPARK-28021
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.4.3
>Reporter: child2d
>Priority: Minor
>
> When i review StaticMemoryManager.scala, there comes a question to me.
> {code:java}
> private def getMaxExecutionMemory(conf: SparkConf): Long = {
>   val systemMaxMemory = conf.getLong("spark.testing.memory", 
> Runtime.getRuntime.maxMemory)
>   if (systemMaxMemory < MIN_MEMORY_BYTES) {
> throw new IllegalArgumentException(s"System memory $systemMaxMemory must 
> " +
>   s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the 
> --driver-memory " +
>   s"option or spark.driver.memory in Spark configuration.")
>   }
>   if (conf.contains("spark.executor.memory")) {
> val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
> if (executorMemory < MIN_MEMORY_BYTES) {
>   throw new IllegalArgumentException(s"Executor memory $executorMemory 
> must be at least " +
> s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
> s"--executor-memory option or spark.executor.memory in Spark 
> configuration.")
> }
>   }
>   val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
>   val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
>   (systemMaxMemory * memoryFraction * safetyFraction).toLong
> }
> {code}
> When a executor tries to getMaxExecutionMemory, it should set systemMaxMemory 
> by using Runtime.getRuntime.maxMemory first, then compares the value between 
> systemMaxMemory and MIN_MEMORY_BYTES.
> If the compared value is true, program thows an exception to remind user to 
> increase heap size by using --driver-memory.
> I wonder if it is wrong because the heap size of executors are setted by 
> --executor-memory?
> Although there is another exception about adjusting executor's memory below, 
> i just think that the first exception may be not appropriate.
> Thanks for answering my question!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28026) How to get the second row from 1 minute window

2019-06-12 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-28026.
--
Resolution: Invalid

> How to get the second row from 1 minute window
> --
>
> Key: SPARK-28026
> URL: https://issues.apache.org/jira/browse/SPARK-28026
> Project: Spark
>  Issue Type: Question
>  Components: Examples, Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Devendra Vishwakarma
>Priority: Major
>
> I am almost blocked for a month I am still figuring out the API to achieve 
> one of the functionalities related to spark structured streaming with window 
> grouping.  So I thought to reach you guys here.
> What I have is stock related time series data and I have grouped them in a 
> 1-minute window along with the stock name. I am able to get first, last row 
> in that 1-minute group, but I want some values from the second row of that 1 
> minute window, which I am not able to do at all. I looked at each function 
> related to aggregation but I could not find any.Please help me.
> This is what I have done so far -
> val aggregates = stockEvents
>  .withWatermark("timestamp", "5 seconds")
>  .groupBy(window($"timestamp", "1 minute", "1 minute", "0 seconds"), 
> $"stockName")
>  .agg(
>  first("tradingprice").alias("open"), //I have to make this value coming from 
> second row
>  last("tradingprice").alias("close"),
>  max("tradingprice").alias("high"),
>  min("tradingprice").alias("low"))
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28026) How to get the second row from 1 minute window

2019-06-12 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862671#comment-16862671
 ] 

Hyukjin Kwon commented on SPARK-28026:
--

Let's ask a question to mailing list before filing as an issue. You will get a 
better answer from there. See https://spark.apache.org/community.html

> How to get the second row from 1 minute window
> --
>
> Key: SPARK-28026
> URL: https://issues.apache.org/jira/browse/SPARK-28026
> Project: Spark
>  Issue Type: Question
>  Components: Examples, Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Devendra Vishwakarma
>Priority: Major
>
> I am almost blocked for a month I am still figuring out the API to achieve 
> one of the functionalities related to spark structured streaming with window 
> grouping.  So I thought to reach you guys here.
> What I have is stock related time series data and I have grouped them in a 
> 1-minute window along with the stock name. I am able to get first, last row 
> in that 1-minute group, but I want some values from the second row of that 1 
> minute window, which I am not able to do at all. I looked at each function 
> related to aggregation but I could not find any.Please help me.
> This is what I have done so far -
> val aggregates = stockEvents
>  .withWatermark("timestamp", "5 seconds")
>  .groupBy(window($"timestamp", "1 minute", "1 minute", "0 seconds"), 
> $"stockName")
>  .agg(
>  first("tradingprice").alias("open"), //I have to make this value coming from 
> second row
>  last("tradingprice").alias("close"),
>  max("tradingprice").alias("high"),
>  min("tradingprice").alias("low"))
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13882) Remove org.apache.spark.sql.execution.local

2019-06-12 Thread Lai Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862641#comment-16862641
 ] 

Lai Zhou commented on SPARK-13882:
--

hi,[~rxin], is this iterator-based local mode  will be re-introduced in the 
future ?

I think a direct iterator-based local mode will be high-efficiency , than can 
help people to do real-time queries.

> Remove org.apache.spark.sql.execution.local
> ---
>
> Key: SPARK-13882
> URL: https://issues.apache.org/jira/browse/SPARK-13882
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Major
> Fix For: 2.0.0
>
>
> We introduced some local operators in org.apache.spark.sql.execution.local 
> package but never fully wired the engine to actually use these. We still plan 
> to implement a full local mode, but it's probably going to be fairly 
> different from what the current iterator-based local mode would look like.
> Let's just remove them for now, and we can always re-introduced them in the 
> future by looking at branch-1.6.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28031) Improve or remove doctest on over function of Column

2019-06-12 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-28031:


Assignee: Liang-Chi Hsieh

> Improve or remove doctest on over function of Column
> 
>
> Key: SPARK-28031
> URL: https://issues.apache.org/jira/browse/SPARK-28031
> Project: Spark
>  Issue Type: Test
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
>Priority: Trivial
>
> Just found the doctest on {{over}} function of {{Column}} is commented. The 
> window spec is also incorrect for the window function used there.
> We should either remove the doctest, or improve it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28031) Improve or remove doctest on over function of Column

2019-06-12 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-28031.
--
   Resolution: Fixed
Fix Version/s: 2.4.4
   3.0.0

Issue resolved by pull request 24854
[https://github.com/apache/spark/pull/24854]

> Improve or remove doctest on over function of Column
> 
>
> Key: SPARK-28031
> URL: https://issues.apache.org/jira/browse/SPARK-28031
> Project: Spark
>  Issue Type: Test
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
>Priority: Trivial
> Fix For: 3.0.0, 2.4.4
>
>
> Just found the doctest on {{over}} function of {{Column}} is commented. The 
> window spec is also incorrect for the window function used there.
> We should either remove the doctest, or improve it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-24791) Spark Structured Streaming randomly does not process batch

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-24791:
--
Comment: was deleted

(was: User 'zhangmeng0426' has created a pull request for this issue:
https://github.com/apache/spark/pull/24791)

> Spark Structured Streaming randomly does not process batch
> --
>
> Key: SPARK-24791
> URL: https://issues.apache.org/jira/browse/SPARK-24791
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: Arvind Ramachandran
>Priority: Major
>
> I have developed an application that writes small CSV files to a specific 
> HDFS folder . Spark Structured Streaming reads the HDFS folder . On a random 
> basis i see that it does not process a CSV File , the only case this occurs 
> is the batch size is one CSV file again random in nature not consistent.I 
> cannot guarantee the size of the batch will be greater than one because the 
> requirement is low latency processing but volume is low.
> I can see  that the commits , offset and source folders has the batch 
> information but the csv file is not processed when i look at the logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27943) Implement default constraint with Column for Hive table

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27943:
--
Affects Version/s: (was: 2.4.0)
   (was: 2.3.0)
   3.0.0

> Implement default constraint with Column for Hive table
> ---
>
> Key: SPARK-27943
> URL: https://issues.apache.org/jira/browse/SPARK-27943
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: jiaan.geng
>Priority: Major
>
>  
>  *Background*
> Default constraint with column is ANSI standard.
> Hive 3.0+ has supported default constraint 
> ref:https://issues.apache.org/jira/browse/HIVE-18726
> But Spark SQL implement this feature not yet.
> *Design*
> Hive is widely used in production environments and is the standard in the 
> field of big data in fact.
> But Hive exists many version used in production and the feature between each 
> version are different.
> Spark SQL need to implement default constraint, but there are three points to 
> pay attention to in design:
> _First_, Spark SQL should reduce coupling with Hive.
> _Second_, default constraint could compatible with different versions of Hive.
> _Thrid_, Which expression of default constraint should Spark SQL support? I 
> think should support `literal`, `current_date()`, `current_timestamp()`. 
> Maybe other expression should also supported, like `Cast(1 as float)`, `1 + 
> 2` and so on.
> We want to save the metadata of default constraint into properties of Hive 
> table, and then we restore metadata from the properties after client gets 
> newest metadata.The implement is the same as other metadata (e.g. 
> partition,bucket,statistics).
> Because default constraint is part of column, so I think could reuse the 
> metadata of StructField. The default constraint will cached by metadata of 
> StructField.
>  
> *Tasks*
> This is a big work, wo I want to split this work into some sub tasks, as 
> follows:
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862548#comment-16862548
 ] 

Stavros Kontopoulos edited comment on SPARK-28025 at 6/12/19 11:32 PM:
---

[~kabhwan] yes we discussed this internally when trying to solve the issue, 
otherwise I would not intervene. I fully respect people colleagues or not ;)


was (Author: skonto):
[~kabhwan] yes we discussed this internally when trying to solve the issue, 
otherwise I would not intervene. 

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862548#comment-16862548
 ] 

Stavros Kontopoulos commented on SPARK-28025:
-

[~kabhwan] yes we discussed this internally when trying to solve the issue, 
otherwise I would not intervene. 

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27919) DataSourceV2: Add v2 session catalog

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27919:
--
Issue Type: Improvement  (was: Bug)

> DataSourceV2: Add v2 session catalog
> 
>
> Key: SPARK-27919
> URL: https://issues.apache.org/jira/browse/SPARK-27919
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Ryan Blue
>Priority: Major
>
> When no default catalog is set, the session catalog (v1) is responsible for 
> table identifiers with no catalog part. When CTAS creates a table with a v2 
> provider, a v2 catalog is required and the default catalog is used. But this 
> may cause Spark to create a table in a catalog that it cannot use to look up 
> the table.
> In this case, a v2 catalog that delegates to the session catalog should be 
> used instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-12 Thread Li Jin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Jin updated SPARK-28006:
---
Description: 
Currently, pandas_udf supports "grouped aggregate" type that can be used with 
unbounded and unbounded windows. There is another set of use cases that can 
benefit from a "grouped transform" type pandas_udf.

Grouped transform is defined as a N -> N mapping over a group. For example, 
"compute zscore for values in the group using the grouped mean and grouped 
stdev", or "rank the values in the group".

Currently, in order to do this, user needs to use "grouped apply", for example:
{code:java}
@pandas_udf(schema, GROUPED_MAP)
def subtract_mean(pdf)
v = pdf['v']
pdf['v'] = v - v.mean()
return pdf

df.groupby('id').apply(subtract_mean)
# +---++
# | id|   v|
# +---++
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---++{code}
This approach has a few downside:
 * Specifying the full return schema is complicated for the user although the 
function only changes one column.
 * The column name 'v' inside as part of the udf, makes the udf less reusable.
 * The entire dataframe is serialized to pass to Python although only one 
column is needed.

Here we propose a new type of pandas_udf to work with these types of use cases:
{code:java}
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf('double', GROUPED_XFORM)
def subtract_mean(v):
return v - v.mean()

w = Window.partitionBy('id')

df = df.withColumn('v', subtract_mean(df['v']).over(w))
# +---++
# | id|   v|
# +---++
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---++{code}
Which addresses the above downsides.
 * The user only needs to specify the output type of a single column.
 * The column being zscored is decoupled from the udf implementation
 * We only need to send one column to Python worker and concat the result with 
the original dataframe (this is what grouped aggregate is doing already)

 

 

  was:
Currently, pandas_udf supports "grouped aggregate" type that can be used with 
unbounded and unbounded windows. There is another set of use cases that can 
benefit from a "grouped transform" type pandas_udf.

Grouped transform is defined as a N -> N mapping over a group. For example, 
"compute zscore for values in the group using the grouped mean and grouped 
stdev", or "rank the values in the group".

Currently, in order to do this, user needs to use "grouped apply", for example:
{code:java}
@pandas_udf(schema, GROUPED_MAP)
def subtract_mean(pdf)
v = pdf['v']
pdf['v'] = v - v.mean()
return pdf

df.groupby('id').apply(subtract_mean)
# +---++
# | id|   v|
# +---++
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---++{code}
This approach has a few downside:
 * Specifying the full return schema is complicated for the user although the 
function only changes one column.
 * The column name 'v' inside as part of the udf, makes the udf less reusable.
 * The entire dataframe is serialized to pass to Python although only one 
column is needed.

Here we propose a new type of pandas_udf to work with these types of use cases:
{code:java}
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf('double', GROUPED_XFORM)
def subtract_mean(v):
return v - v.mean() / v.std()

w = Window.partitionBy('id')

df = df.withColumn('v', subtract_mean(df['v']).over(w))
# +---++
# | id|   v|
# +---++
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---++{code}
Which addresses the above downsides.
 * The user only needs to specify the output type of a single column.
 * The column being zscored is decoupled from the udf implementation
 * We only need to send one column to Python worker and concat the result with 
the original dataframe (this is what grouped aggregate is doing already)

 

 


> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
> 

[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-12 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862505#comment-16862505
 ] 

Li Jin commented on SPARK-28006:


Thanks [~hyukjin.kwon] for the comments! I updated the description to include 
input/outputs.

Yeah I don't think currently the Scala window function has such type. The 
analogy of this in pandas is groupby transform, (hence the name grouped 
transform udf)

 
{code:java}
>>> df = pd.DataFrame({'id': [1, 1, 2, 2, 2], 'value': [1., 2., 3., 5., 10.]})

>>> df
   id  value

0   1    1.0
1   1    2.0
2   2    3.0
3   2    5.0
4   2   10.0

>>> df['value_demean'] = df.groupby('id')['value'].transform(lambda x: x - 
>>> x.mean())
>>> df

   id  value  value_demean
0   1    1.0          -0.5
1   1    2.0           0.5
2   2    3.0          -3.0
3   2    5.0          -1.0
4   2   10.0           4.0
{code}
 

> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def subtract_mean(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean()
> return pdf
> df.groupby('id').apply(subtract_mean)
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> This approach has a few downside:
>  * Specifying the full return schema is complicated for the user although the 
> function only changes one column.
>  * The column name 'v' inside as part of the udf, makes the udf less reusable.
>  * The entire dataframe is serialized to pass to Python although only one 
> column is needed.
> Here we propose a new type of pandas_udf to work with these types of use 
> cases:
> {code:java}
> df = spark.createDataFrame(
> [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
> ("id", "v"))
> @pandas_udf('double', GROUPED_XFORM)
> def subtract_mean(v):
> return v - v.mean() / v.std()
> w = Window.partitionBy('id')
> df = df.withColumn('v', subtract_mean(df['v']).over(w))
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> Which addresses the above downsides.
>  * The user only needs to specify the output type of a single column.
>  * The column being zscored is decoupled from the udf implementation
>  * We only need to send one column to Python worker and concat the result 
> with the original dataframe (this is what grouped aggregate is doing already)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862449#comment-16862449
 ] 

Jungtaek Lim commented on SPARK-28025:
--

Personally I would respect the reporter and encourage to submit a patch by 
theirselves so that we could have broader contributors, but I assume you two 
are colleague (same employer) and discussed to decide who to submit a PR.

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28030) Binary file data source doesn't support space in file names

2019-06-12 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng resolved SPARK-28030.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24855
[https://github.com/apache/spark/pull/24855]

> Binary file data source doesn't support space in file names
> ---
>
> Key: SPARK-28030
> URL: https://issues.apache.org/jira/browse/SPARK-28030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
> Fix For: 3.0.0
>
>
> {code}
> echo 123 > "/tmp/test space.txt"
> spark.read.format("binaryFile").load("/tmp/test space.txt").count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18112) Spark2.x does not support read data from Hive 2.x metastore

2019-06-12 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862432#comment-16862432
 ] 

Dongjoon Hyun commented on SPARK-18112:
---

[~honglun]. You should not replace Apache Spark jars in `jars` directory. Those 
files are used for internal Hive-related *execution* inside Spark.
`--conf spark.sql.hive.metastore.version` is only loading new Hive jars 
additionally for *metastore* access in a isolated class loader separately.

> Spark2.x does not support read data from Hive 2.x metastore
> ---
>
> Key: SPARK-18112
> URL: https://issues.apache.org/jira/browse/SPARK-18112
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: KaiXu
>Assignee: Xiao Li
>Priority: Critical
> Fix For: 2.2.0
>
>
> Hive2.0 has been released in February 2016, after that Hive2.0.1 and 
> Hive2.1.0 have also been released for a long time, but till now spark only 
> support to read hive metastore data from Hive1.2.1 and older version, since 
> Hive2.x has many bugs fixed and performance improvement it's better and 
> urgent to upgrade to support Hive2.x
> failed to load data from hive2.x metastore:
> Exception in thread "main" java.lang.NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT
> at 
> org.apache.spark.sql.hive.HiveUtils$.hiveClientConfigurations(HiveUtils.scala:197)
> at 
> org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:262)
> at 
> org.apache.spark.sql.hive.HiveSharedState.metadataHive$lzycompute(HiveSharedState.scala:39)
> at 
> org.apache.spark.sql.hive.HiveSharedState.metadataHive(HiveSharedState.scala:38)
> at 
> org.apache.spark.sql.hive.HiveSharedState.externalCatalog$lzycompute(HiveSharedState.scala:4
> at 
> org.apache.spark.sql.hive.HiveSharedState.externalCatalog(HiveSharedState.scala:45)
> at 
> org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:50)
> at 
> org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48)
> at 
> org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:31)
> at org.apache.spark.sql.SparkSession.table(SparkSession.scala:568)
> at org.apache.spark.sql.SparkSession.table(SparkSession.scala:564)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862339#comment-16862339
 ] 

Stavros Kontopoulos edited comment on SPARK-28025 at 6/12/19 6:26 PM:
--

There is a workaround (avoid creating crc files if you dont want, in certain 
envs it is the default 
[https://cloud.google.com/blog/products/storage-data-transfer/new-file-checksum-feature-lets-you-validate-data-transfers-between-hdfs-and-cloud-storage])
 by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves and only if 
the underlying fs supports it. However, it will create the checksum file in any 
case.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files if you set that flag.

Note that the crc is created when the tmp file is created not during rename or 
mv.

I will create a PR shortly. 


was (Author: skonto):
There is a workaround by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves and only if 
the underlying fs supports it. However, it will create the checksum file in any 
case.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files if you set that flag.

Note that the crc is created when the tmp file is created not during rename or 
mv.

I will create a PR shortly.

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For 

[jira] [Comment Edited] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862339#comment-16862339
 ] 

Stavros Kontopoulos edited comment on SPARK-28025 at 6/12/19 6:20 PM:
--

There is a workaround by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves and only if 
the underlying fs supports it. However, it will create the checksum file in any 
case.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files if you set that flag.

Note that the crc is created when the tmp file is created not during rename or 
mv.

I will create a PR shortly.


was (Author: skonto):
There is a workaround by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files if you set that flag.

Note that the crc is created when the tmp file is created not during rename or 
mv.

I will create a PR shortly.

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27907) HiveUDAF should return NULL in case of 0 rows

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27907:
--
Priority: Blocker  (was: Major)

> HiveUDAF should return NULL in case of 0 rows
> -
>
> Key: SPARK-27907
> URL: https://issues.apache.org/jira/browse/SPARK-27907
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Ajith S
>Assignee: Ajith S
>Priority: Blocker
> Fix For: 2.3.4, 2.4.4, 3.0.0
>
>
> When query returns zero rows, the HiveUDAFFunction throws NPE
> CASE 1:
> create table abc(a int)
> select histogram_numeric(a,2) from abc // NPE
> Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most 
> recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost, executor 
> driver): java.lang.NullPointerException
>   at org.apache.spark.sql.hive.HiveUDAFFunction.eval(hiveUDFs.scala:471)
>   at org.apache.spark.sql.hive.HiveUDAFFunction.eval(hiveUDFs.scala:315)
>   at 
> org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.eval(interfaces.scala:543)
>   at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:231)
>   at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:97)
>   at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:132)
>   at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:839)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:839)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>   at org.apache.spark.scheduler.Task.run(Task.scala:122)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> CASE 2:
> create table abc(a int)
> insert into abc values (1)
> select histogram_numeric(a,2) from abc where a=3 //NPE
> Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most 
> recent failure: Lost task 0.0 in stage 4.0 (TID 5, localhost, executor 
> driver): java.lang.NullPointerException
>   at 
> org.apache.spark.sql.hive.HiveUDAFFunction.serialize(hiveUDFs.scala:477)
>   at 
> org.apache.spark.sql.hive.HiveUDAFFunction.serialize(hiveUDFs.scala:315)
>   at 
> org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:570)
>   at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:254)
>   at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:97)
>   at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:132)
>   at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:839)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:839)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
>   at 

[jira] [Comment Edited] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862339#comment-16862339
 ] 

Stavros Kontopoulos edited comment on SPARK-28025 at 6/12/19 6:18 PM:
--

There is a workaround by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files if you set that flag.

Note that the crc is created when the tmp file is created not during rename or 
mv.

I will create a PR shortly.


was (Author: skonto):
There is a workaround by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files.

Note that the crc is created when the tmp file is created not during rename or 
mv.

I will create a PR shortly.

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862339#comment-16862339
 ] 

Stavros Kontopoulos edited comment on SPARK-28025 at 6/12/19 6:17 PM:
--

There is a workaround by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files.

Note that the crc is created when the tmp file is created not during rename or 
mv.

I will create a PR shortly.


was (Author: skonto):
There is a workaround by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files.

Note that the crc is created when the tmp file is created not during rename or 
mv.

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26045) Error in the spark 2.4 release package with the spark-avro_2.11 depdency

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26045:
--
Priority: Blocker  (was: Major)

> Error in the spark 2.4 release package with the spark-avro_2.11 depdency
> 
>
> Key: SPARK-26045
> URL: https://issues.apache.org/jira/browse/SPARK-26045
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.4.0
> Environment: 4.15.0-38-generic #41-Ubuntu SMP Wed Oct 10 10:59:38 UTC 
> 2018 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Oscar garcía 
>Assignee: Sean Owen
>Priority: Blocker
> Fix For: 2.4.4, 3.0.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello I have been problems with the last spark 2.4 release, the read avro 
> file feature does not seem to be working, I have fixed it in local building 
> the source code and updating the *avro-1.8.2.jar* on the *$SPARK_HOME*/jars/ 
> dependencies.
> With the default spark 2.4 release when I try to read an avro file spark 
> raise the following exception.  
> {code:java}
> spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.0
> scala> spark.read.format("avro").load("file.avro")
> java.lang.NoSuchMethodError: 
> org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
> at 
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:51)
> at 
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105
> {code}
> Checksum:  spark-2.4.0-bin-without-hadoop.tgz: 7670E29B 59EAE7A8 5DBC9350 
> 085DD1E0 F056CA13 11365306 7A6A32E9 B607C68E A8DAA666 EF053350 008D0254 
> 318B70FB DE8A8B97 6586CA19 D65BA2B3 FD7F919E
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862339#comment-16862339
 ] 

Stavros Kontopoulos commented on SPARK-28025:
-

There is a workaround by setting `--conf 
spark.hadoop.spark.sql.streaming.checkpointFileManagerClass=org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager`
 when using local fs

and modifying the FileSystemBasedCheckpointFileManager to run 
`fs.setWriteChecksum(false)` after fs is created.

Reason is the FileContextBasedCheckpointFileManager will use ChecksumFS 
([https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java])
 under the hoods which will ignore "

CreateOpts.checksumParam(ChecksumOpt.createDisabled())" passed. These settings 
will only avoid creating checksums for the checksums themshelves.

FileSystemBasedCheckpointFileManager uses 
[https://github.com/apache/hadoop/blob/73746c5da76d5e39df131534a1ec35dfc5d2529b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]
 which allows to avoid checksums when creating files.

Note that the crc is created when the tmp file is created not during rename or 
mv.

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-12 Thread Li Jin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Jin updated SPARK-28006:
---
Description: 
Currently, pandas_udf supports "grouped aggregate" type that can be used with 
unbounded and unbounded windows. There is another set of use cases that can 
benefit from a "grouped transform" type pandas_udf.

Grouped transform is defined as a N -> N mapping over a group. For example, 
"compute zscore for values in the group using the grouped mean and grouped 
stdev", or "rank the values in the group".

Currently, in order to do this, user needs to use "grouped apply", for example:
{code:java}
@pandas_udf(schema, GROUPED_MAP)
def subtract_mean(pdf)
v = pdf['v']
pdf['v'] = v - v.mean()
return pdf

df.groupby('id').apply(subtract_mean)
# +---++
# | id|   v|
# +---++
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---++{code}
This approach has a few downside:
 * Specifying the full return schema is complicated for the user although the 
function only changes one column.
 * The column name 'v' inside as part of the udf, makes the udf less reusable.
 * The entire dataframe is serialized to pass to Python although only one 
column is needed.

Here we propose a new type of pandas_udf to work with these types of use cases:
{code:java}
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf('double', GROUPED_XFORM)
def subtract_mean(v):
return v - v.mean() / v.std()

w = Window.partitionBy('id')

df = df.withColumn('v', subtract_mean(df['v']).over(w))
# +---++
# | id|   v|
# +---++
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---++{code}
Which addresses the above downsides.
 * The user only needs to specify the output type of a single column.
 * The column being zscored is decoupled from the udf implementation
 * We only need to send one column to Python worker and concat the result with 
the original dataframe (this is what grouped aggregate is doing already)

 

 

  was:
Currently, pandas_udf supports "grouped aggregate" type that can be used with 
unbounded and unbounded windows. There is another set of use cases that can 
benefit from a "grouped transform" type pandas_udf.

Grouped transform is defined as a N -> N mapping over a group. For example, 
"compute zscore for values in the group using the grouped mean and grouped 
stdev", or "rank the values in the group".

 

Currently, in order to do this, user needs to use "grouped apply", for example:

 
{code:java}
@pandas_udf(schema, GROUPED_MAP)
def zscore(pdf)
v = pdf['v']
pdf['v'] = v - v.mean() / v.std()
return pdf

df.groupby('id').apply(zscore){code}
This approach has a few downside:
 * Specifying the full return schema is complicated for the user although the 
function only changes one column.
 * The column name 'v' inside as part of the udf, makes the udf less reusable.
 * The entire dataframe is serialized to pass to Python although only one 
column is needed.

Here we propose a new type of pandas_udf to work with these types of use cases:
{code:java}
@pandas_udf('double', GROUPED_XFORM)
def zscore(v):
return v - v.mean() / v.std()

w = Window.partitionBy('id')

df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code}
Which addresses the above downsides.
 * The user only needs to specify the output type of a single column.
 * The column being zscored is decoupled from the udf implementation
 * We only need to send one column to Python worker and concat the result with 
the original dataframe (this is what grouped aggregate is doing already)

 

 


> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def subtract_mean(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean()
> return pdf
> df.groupby('id').apply(subtract_mean)
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> This approach has 

[jira] [Updated] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-12 Thread Li Jin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Jin updated SPARK-28006:
---
Description: 
Currently, pandas_udf supports "grouped aggregate" type that can be used with 
unbounded and unbounded windows. There is another set of use cases that can 
benefit from a "grouped transform" type pandas_udf.

Grouped transform is defined as a N -> N mapping over a group. For example, 
"compute zscore for values in the group using the grouped mean and grouped 
stdev", or "rank the values in the group".

 

Currently, in order to do this, user needs to use "grouped apply", for example:

 
{code:java}
@pandas_udf(schema, GROUPED_MAP)
def zscore(pdf)
v = pdf['v']
pdf['v'] = v - v.mean() / v.std()
return pdf

df.groupby('id').apply(zscore){code}
This approach has a few downside:
 * Specifying the full return schema is complicated for the user although the 
function only changes one column.
 * The column name 'v' inside as part of the udf, makes the udf less reusable.
 * The entire dataframe is serialized to pass to Python although only one 
column is needed.

Here we propose a new type of pandas_udf to work with these types of use cases:
{code:java}
@pandas_udf('double', GROUPED_XFORM)
def zscore(v):
return v - v.mean() / v.std()

w = Window.partitionBy('id')

df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code}
Which addresses the above downsides.
 * The user only needs to specify the output type of a single column.
 * The column being zscored is decoupled from the udf implementation
 * We only need to send one column to Python worker and concat the result with 
the original dataframe (this is what grouped aggregate is doing already)

 

 

  was:
Currently, pandas_udf supports "grouped aggregate" type that can be used with 
unbounded and unbounded windows. There is another set of use cases that can 
benefit from a "grouped transform" type pandas_udf.

Grouped transform is defined as a N -> N mapping over a group. For example, 
"compute zscore for values in the group using the grouped mean and grouped 
stdev", or "rank the values in the group".

 

Currently, in order to do this, user needs to use "grouped apply", for example:

 
{code:java}
@pandas_udf(schema, GROUPED_MAP)
def zscore(pdf)
v = pdf['v']
pdf['v'] = v - v.mean() / v.std()
return pdf

df.groupby('id').apply(zscore){code}
This approach has a few downside:

 
 * Specifying the full return schema is complicated for the user although the 
function only changes one column.
 * The column name 'v' inside as part of the udf, makes the udf less reusable.
 * The entire dataframe is serialized to pass to Python although only one 
column is needed.

Here we propose a new type of pandas_udf to work with these types of use cases:
{code:java}
@pandas_udf('double', GROUPED_XFORM)
def zscore(v):
return v - v.mean() / v.std()

w = Window.partitionBy('id')

df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code}
Which addresses the above downsides.
 * The user only needs to specify the output type of a single column.
 * The column being zscored is decoupled from the udf implementation
 * We only need to send one column to Python worker and concat the result with 
the original dataframe (this is what grouped aggregate is doing already)

 

 


> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
>  
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
>  
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def zscore(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean() / v.std()
> return pdf
> df.groupby('id').apply(zscore){code}
> This approach has a few downside:
>  * Specifying the full return schema is complicated for the user although the 
> function only changes one column.
>  * The column name 'v' inside as part of the udf, makes the udf less reusable.
>  * The entire dataframe is serialized to pass to Python although only one 
> column is needed.
> Here we propose a new type of pandas_udf to work with these types of use 
> cases:
> {code:java}
> @pandas_udf('double', GROUPED_XFORM)
> def zscore(v):
>   

[jira] [Assigned] (SPARK-26949) Prevent "purge" to remove needed batch files in CompactibleFileStreamLog

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-26949:
-

Assignee: Jungtaek Lim

> Prevent "purge" to remove needed batch files in CompactibleFileStreamLog
> 
>
> Key: SPARK-26949
> URL: https://issues.apache.org/jira/browse/SPARK-26949
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Minor
>
> I've seen couple of trials (in opened PRs, even I've also tried) which calls 
> purge() in CompactibleFileStreamLog, but after looking at the codebase of 
> CompactibleFileStreamLog, I've realized that purging latest compaction batch 
> would break internal of CompactibleFileStreamLog and it throws 
> IllegalStateException.
> Given that CompactibleFileStreamLog maintains the batches and purges 
> according to its configuration, it would be safer to just rely on 
> CompactibleFileStreamLog to purge and prevent calling `purge` outside of 
> CompactibleFileStreamLog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26949) Prevent "purge" to remove needed batch files in CompactibleFileStreamLog

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26949:
--
Target Version/s: 3.0.0

> Prevent "purge" to remove needed batch files in CompactibleFileStreamLog
> 
>
> Key: SPARK-26949
> URL: https://issues.apache.org/jira/browse/SPARK-26949
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Minor
>
> I've seen couple of trials (in opened PRs, even I've also tried) which calls 
> purge() in CompactibleFileStreamLog, but after looking at the codebase of 
> CompactibleFileStreamLog, I've realized that purging latest compaction batch 
> would break internal of CompactibleFileStreamLog and it throws 
> IllegalStateException.
> Given that CompactibleFileStreamLog maintains the batches and purges 
> according to its configuration, it would be safer to just rely on 
> CompactibleFileStreamLog to purge and prevent calling `purge` outside of 
> CompactibleFileStreamLog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-12 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862287#comment-16862287
 ] 

Li Jin commented on SPARK-27463:


I think one way to design this API to mimic the existing dataset cogroup API:
{code:java}
gdf1 = df1.groupByKey('id')
gdf2 = df2.groupByKey('id')

result = gdf1.cogroup(gdf2).apply(my_pandas_udf){code}
Although the KeyValueGroupedData and groupByKey isn't really exposed to pyspark 
(or maybe it doesn't apply to pyspark because of type?) So another way to go 
about this is to use RelationalGroupedData:
{code:java}
gdf1 = df1.groupBy('id')
gdf2 = df2.groupBy('id')

result = gdf1.cogroup(gdf2).apply(my_pandas_udf){code}

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27823) Add an abstraction layer for accelerator resource handling to avoid manipulating raw confs

2019-06-12 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng reassigned SPARK-27823:
-

Assignee: Xiangrui Meng  (was: Thomas Graves)

> Add an abstraction layer for accelerator resource handling to avoid 
> manipulating raw confs
> --
>
> Key: SPARK-27823
> URL: https://issues.apache.org/jira/browse/SPARK-27823
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>
> In SPARK-27488, we extract resource requests and allocation by parsing raw 
> Spark confs. This hurts readability because we didn't have the abstraction at 
> resource level. After we merge the core changes, we should do a refactoring 
> and make the code more readable.
> See https://github.com/apache/spark/pull/24615#issuecomment-494580663.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28018) Allow upcasting decimal to double/float

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-28018.
---
Resolution: Invalid

This is closed by the author due to the loss of precision. Please see the PR 
comments for the detail.

> Allow upcasting decimal to double/float
> ---
>
> Key: SPARK-28018
> URL: https://issues.apache.org/jira/browse/SPARK-28018
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Priority: Major
>
> Currently, Spark only allows upcasting DecimalType to IntegralType or 
> DecimalType.
> This PR proposes: if the target DecimalType is tighter than DoubleType or 
> FloatType, we can upcast it as well.
> The upcasting matters because it blocks 
> https://github.com/apache/spark/pull/24806. E.g, if there is a table "t" with 
> only one column of double type, 
> ```
> INSERT INTO TABLE t SELECT 10.0
> ```
> The data type of value 10.0 is DecimalType(3, 1). In the current code, the 
> upcasting failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28030) Binary file data source doesn't support space in file names

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-28030:


Assignee: Apache Spark  (was: Xiangrui Meng)

> Binary file data source doesn't support space in file names
> ---
>
> Key: SPARK-28030
> URL: https://issues.apache.org/jira/browse/SPARK-28030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Apache Spark
>Priority: Major
>
> {code}
> echo 123 > "/tmp/test space.txt"
> spark.read.format("binaryFile").load("/tmp/test space.txt").count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28030) Binary file data source doesn't support space in file names

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-28030:


Assignee: Xiangrui Meng  (was: Apache Spark)

> Binary file data source doesn't support space in file names
> ---
>
> Key: SPARK-28030
> URL: https://issues.apache.org/jira/browse/SPARK-28030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>
> {code}
> echo 123 > "/tmp/test space.txt"
> spark.read.format("binaryFile").load("/tmp/test space.txt").count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28031) Improve or remove doctest on over function of Column

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-28031:


Assignee: Apache Spark

> Improve or remove doctest on over function of Column
> 
>
> Key: SPARK-28031
> URL: https://issues.apache.org/jira/browse/SPARK-28031
> Project: Spark
>  Issue Type: Test
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Liang-Chi Hsieh
>Assignee: Apache Spark
>Priority: Trivial
>
> Just found the doctest on {{over}} function of {{Column}} is commented. The 
> window spec is also incorrect for the window function used there.
> We should either remove the doctest, or improve it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28031) Improve or remove doctest on over function of Column

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-28031:


Assignee: (was: Apache Spark)

> Improve or remove doctest on over function of Column
> 
>
> Key: SPARK-28031
> URL: https://issues.apache.org/jira/browse/SPARK-28031
> Project: Spark
>  Issue Type: Test
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Liang-Chi Hsieh
>Priority: Trivial
>
> Just found the doctest on {{over}} function of {{Column}} is commented. The 
> window spec is also incorrect for the window function used there.
> We should either remove the doctest, or improve it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28031) Improve or remove doctest on over function of Column

2019-06-12 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-28031:
---

 Summary: Improve or remove doctest on over function of Column
 Key: SPARK-28031
 URL: https://issues.apache.org/jira/browse/SPARK-28031
 Project: Spark
  Issue Type: Test
  Components: PySpark, SQL
Affects Versions: 3.0.0
Reporter: Liang-Chi Hsieh


Just found the doctest on {{over}} function of {{Column}} is commented. The 
window spec is also incorrect for the window function used there.

We should either remove the doctest, or improve it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28030) Binary file data source doesn't support space in file names

2019-06-12 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-28030:
--
Description: 
{code}
echo 123 > "/tmp/test space.txt"


spark.read.format("binaryFile").load("/tmp/test space.txt").count()
{code}

> Binary file data source doesn't support space in file names
> ---
>
> Key: SPARK-28030
> URL: https://issues.apache.org/jira/browse/SPARK-28030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>
> {code}
> echo 123 > "/tmp/test space.txt"
> spark.read.format("binaryFile").load("/tmp/test space.txt").count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28030) Binary file data source doesn't support space in file names

2019-06-12 Thread Xiangrui Meng (JIRA)
Xiangrui Meng created SPARK-28030:
-

 Summary: Binary file data source doesn't support space in file 
names
 Key: SPARK-28030
 URL: https://issues.apache.org/jira/browse/SPARK-28030
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Xiangrui Meng
Assignee: Xiangrui Meng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862227#comment-16862227
 ] 

Steve Loughran commented on SPARK-28025:


looking at the previous patch, you don't need to call exists() Before the 
delete as delete is required to be a no-op if the source isnt' there. Saves the 
cost of a HEAD if you are using an object store as a destination.

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-12 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862225#comment-16862225
 ] 

Li Jin commented on SPARK-27463:


For cogroup, I don't think there is analogous API in pandas. There is analogous 
Spark Scala API in KeyValueGroupedDataset:
{code:java}
val gdf1 = df1.groupByKey('id')
val gdf2 = df2.groupByKey('id')

val result = gdf1.cogroup(gdf2)(my_scala_udf){code}
 

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28013) Upgrade to Kafka 2.2.1

2019-06-12 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-28013.
---
   Resolution: Fixed
 Assignee: Dongjoon Hyun
Fix Version/s: 3.0.0

This is resolved via https://github.com/apache/spark/pull/24847

> Upgrade to Kafka 2.2.1
> --
>
> Key: SPARK-28013
> URL: https://issues.apache.org/jira/browse/SPARK-28013
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.0.0
>
>
> This issue updates Kafka dependency to 2.2.1 to bring the following 
> improvement and bug fixes.
> https://issues.apache.org/jira/projects/KAFKA/versions/12345010



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28029) Add int2.sql

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-28029:


Assignee: (was: Apache Spark)

> Add int2.sql
> 
>
> Key: SPARK-28029
> URL: https://issues.apache.org/jira/browse/SPARK-28029
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> In this ticket, we plan to add the regression test cases of 
> [https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/int2.sql].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28029) Add int2.sql

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-28029:


Assignee: Apache Spark

> Add int2.sql
> 
>
> Key: SPARK-28029
> URL: https://issues.apache.org/jira/browse/SPARK-28029
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Apache Spark
>Priority: Major
>
> In this ticket, we plan to add the regression test cases of 
> [https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/int2.sql].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28029) Add int2.sql

2019-06-12 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-28029:
---

 Summary: Add int2.sql
 Key: SPARK-28029
 URL: https://issues.apache.org/jira/browse/SPARK-28029
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.0.0
Reporter: Yuming Wang


In this ticket, we plan to add the regression test cases of 
[https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/int2.sql].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28024) Incorrect numeric values when out of range

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-28024:

Description: 
For example:
{code:sql}
select tinyint(128) * tinyint(2); -- 0
select smallint(2147483647) * smallint(2); -- -2
select int(2147483647) * int(2); -- -2
SELECT smallint((-32768)) * smallint(-1); -- -32768
{code}

  was:
For example:
{code:sql}
select tinyint(128) * tinyint(2); -- 0
select smallint(2147483647) * smallint(2); -- -2
select int(2147483647) * int(2); -- -2
SELECT smallint((-32768)) * smallint(-1);
{code}


> Incorrect numeric values when out of range
> --
>
> Key: SPARK-28024
> URL: https://issues.apache.org/jira/browse/SPARK-28024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> For example:
> {code:sql}
> select tinyint(128) * tinyint(2); -- 0
> select smallint(2147483647) * smallint(2); -- -2
> select int(2147483647) * int(2); -- -2
> SELECT smallint((-32768)) * smallint(-1); -- -32768
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28024) Incorrect numeric values when out of range

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-28024:

Description: 
For example:
{code:sql}
select tinyint(128) * tinyint(2); -- 0
select smallint(2147483647) * smallint(2); -- -2
select int(2147483647) * int(2); -- -2
SELECT smallint((-32768)) * smallint(-1);
{code}

  was:
For example:
{code:sql}
select tinyint(128) * tinyint(2); -- 0
select smallint(2147483647) * smallint(2); -- -2
select int(2147483647) * int(2); -- -2
{code}



> Incorrect numeric values when out of range
> --
>
> Key: SPARK-28024
> URL: https://issues.apache.org/jira/browse/SPARK-28024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> For example:
> {code:sql}
> select tinyint(128) * tinyint(2); -- 0
> select smallint(2147483647) * smallint(2); -- -2
> select int(2147483647) * int(2); -- -2
> SELECT smallint((-32768)) * smallint(-1);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28009) PipedRDD: Block not locked for reading failure

2019-06-12 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862180#comment-16862180
 ] 

Liang-Chi Hsieh commented on SPARK-28009:
-

I think this looks like duplicate to SPARK-27666.

> PipedRDD: Block not locked for reading failure
> --
>
> Key: SPARK-28009
> URL: https://issues.apache.org/jira/browse/SPARK-28009
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
> Environment: Running in a Docker container with Spark 2.4.0 on Linux 
> kernel 4.9.0
>Reporter: Douglas Colkitt
>Priority: Major
>
> PipedRDD operation fails with the below stack trace. Failure primarily occurs 
> when the STDOUT from the Unix process is small and the STDIN into the Unix 
> process is comparatively much larger.
>  
> Given the similarity to SPARK-18406, this seems to be due to a race condition 
> when it comes to accessing the block's reader locker. The PipedRDD class 
> implementation spawns STDIN iterator in a separate thread, so that would 
> corroborate the race condition hypothesis.
>  
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
> at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:842)
> at 
> org.apache.spark.storage.BlockManager.releaseLockAndDispose(BlockManager.scala:1610)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$2.apply$mcV$sp(BlockManager.scala:621)
> at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at org.apache.spark.rdd.PipedRDD$$anon$3.run(PipedRDD.scala:145)
> Suppressed: java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:156)
> at 
> org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
> at 
> org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
> at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:363)
> at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
> at scala.Option.foreach(Option.scala:257)
> at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:362)
> at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:358)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at 
> org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:358)
> at 
> org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:858)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$1.apply$mcV$sp(Executor.scala:409)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1369)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28028) Cast numeric to integral type need round

2019-06-12 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-28028:
---

 Summary: Cast numeric to integral type need round
 Key: SPARK-28028
 URL: https://issues.apache.org/jira/browse/SPARK-28028
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.0.0
Reporter: Yuming Wang


For example:
{code:sql}
select cast(-1.5 as smallint);
{code}
Spark SQL returns {{-1}}, but PostgreSQL returns {{-2}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27750) Standalone scheduler - ability to prioritize applications over drivers, many drivers act like Denial of Service

2019-06-12 Thread t oo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

t oo updated SPARK-27750:
-
Description: 
If I submit 1000 spark submit drivers then they consume all the cores on my 
cluster (essentially it acts like a Denial of Service) and no spark 
'application' gets to run since the cores are all consumed by the 'drivers'. 
This feature is about having the ability to prioritize applications over 
drivers so that at least some 'applications' can start running. I guess it 
would be like: If (driver.state = 'submitted' and (exists some app.state = 
'submitted')) then set app.state = 'running'

if all apps have app.state = 'running' then set driver.state = 'submitted' 

 

Secondary to this, why must a driver consume a minimum of 1 entire core?

  was:
If I submit 1000 spark submit drivers then they consume all the cores on my 
cluster (essentially it acts like a Denial of Service) and no spark 
'application' gets to run since the cores are all consumed by the 'drivers'. 
This feature is about having the ability to prioritize applications over 
drivers so that at least some 'applications' can start running. I guess it 
would be like: If (driver.state = 'submitted' and (exists some app.state = 
'submitted')) then set app.state = 'running'

if all apps have app.state = 'running' then set driver.state = 'submitted' 


> Standalone scheduler - ability to prioritize applications over drivers, many 
> drivers act like Denial of Service
> ---
>
> Key: SPARK-27750
> URL: https://issues.apache.org/jira/browse/SPARK-27750
> Project: Spark
>  Issue Type: New Feature
>  Components: Scheduler
>Affects Versions: 2.3.3, 2.4.3
>Reporter: t oo
>Priority: Minor
>
> If I submit 1000 spark submit drivers then they consume all the cores on my 
> cluster (essentially it acts like a Denial of Service) and no spark 
> 'application' gets to run since the cores are all consumed by the 'drivers'. 
> This feature is about having the ability to prioritize applications over 
> drivers so that at least some 'applications' can start running. I guess it 
> would be like: If (driver.state = 'submitted' and (exists some app.state = 
> 'submitted')) then set app.state = 'running'
> if all apps have app.state = 'running' then set driver.state = 'submitted' 
>  
> Secondary to this, why must a driver consume a minimum of 1 entire core?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28027) Add Bitwise shift left/right

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-28027:


Assignee: (was: Apache Spark)

> Add Bitwise shift left/right
> 
>
> Key: SPARK-28027
> URL: https://issues.apache.org/jira/browse/SPARK-28027
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> ||Operator||Description||Example||Result||
> |{{<<}}|bitwise shift left|{{1 << 4}}|{{16}}|
> |{{>>}}|bitwise shift right|{{8 >> 2}}|{{2}}|
> https://www.postgresql.org/docs/11/functions-math.html
> https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/LanguageElements/Operators/BitwiseOperators.htm
> https://docs.aws.amazon.com/redshift/latest/dg/r_OPERATOR_SYMBOLS.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-28027) Add Bitwise shift left/right

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-28027:


Assignee: Apache Spark

> Add Bitwise shift left/right
> 
>
> Key: SPARK-28027
> URL: https://issues.apache.org/jira/browse/SPARK-28027
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Apache Spark
>Priority: Major
>
> ||Operator||Description||Example||Result||
> |{{<<}}|bitwise shift left|{{1 << 4}}|{{16}}|
> |{{>>}}|bitwise shift right|{{8 >> 2}}|{{2}}|
> https://www.postgresql.org/docs/11/functions-math.html
> https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/LanguageElements/Operators/BitwiseOperators.htm
> https://docs.aws.amazon.com/redshift/latest/dg/r_OPERATOR_SYMBOLS.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28007) Caret operator (^) means bitwise XOR in Spark/Hive and exponentiation in Postgres/Redshift

2019-06-12 Thread Josh Rosen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-28007:
---
Description: 
The expression {{expr1 ^ expr2}} has different meanings in Spark and Postgres:
 * [In Postgres|https://www.postgresql.org/docs/11/functions-math.html] and 
[Redshift|https://docs.aws.amazon.com/redshift/latest/dg/r_OPERATOR_SYMBOLS.html]
 , this returns {{expr1}} raised to the exponent {{expr2}} (additionally, the 
Postgres docs explicitly state that this operation is left-associative).
 * [In Spark|https://spark.apache.org/docs/2.4.3/api/sql/index.html#_14] and 
[Hive|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-ArithmeticOperators],
 this returns the bitwise exclusive OR of {{expr1}} and {{expr2}}.

I'm reporting this under the Postgres compatibility umbrella. If we have SQL 
dialect support (e.g. a Postgres compatibility dialect), maybe this behavior 
could be flagged there? Alternatively, maybe we could document this in a 
compatibility / porting guide?

  was:
The expression {{expr1 ^ expr2}} has different meanings in Spark and Postgres:
 * [In Postgres|https://www.postgresql.org/docs/11/functions-math.html] and 
[Redshift|https://docs.aws.amazon.com/redshift/latest/dg/r_OPERATOR_SYMBOLS.html]
 , this returns {{expr1}} raised to the exponent {{expr2}} (additionally, the 
Postgres docs explicitly state that this operation is left-associative).
 * [In Spark|https://spark.apache.org/docs/2.4.3/api/sql/index.html#_14] and 
[Hive|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-ArithmeticOperators],
 this returns the bitwise exclusive OR of {{expr1}} and {{expr2}}.

I'm reporting this under the Postgres compatibility umbrella. If we have SQL 
dialect support (e.g. a Postgres compatibility dialect), maybe this behavior 
could be flagged there?


> Caret operator (^) means bitwise XOR in Spark/Hive and exponentiation in 
> Postgres/Redshift
> --
>
> Key: SPARK-28007
> URL: https://issues.apache.org/jira/browse/SPARK-28007
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Josh Rosen
>Priority: Major
>
> The expression {{expr1 ^ expr2}} has different meanings in Spark and Postgres:
>  * [In Postgres|https://www.postgresql.org/docs/11/functions-math.html] and 
> [Redshift|https://docs.aws.amazon.com/redshift/latest/dg/r_OPERATOR_SYMBOLS.html]
>  , this returns {{expr1}} raised to the exponent {{expr2}} (additionally, the 
> Postgres docs explicitly state that this operation is left-associative).
>  * [In Spark|https://spark.apache.org/docs/2.4.3/api/sql/index.html#_14] and 
> [Hive|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-ArithmeticOperators],
>  this returns the bitwise exclusive OR of {{expr1}} and {{expr2}}.
> I'm reporting this under the Postgres compatibility umbrella. If we have SQL 
> dialect support (e.g. a Postgres compatibility dialect), maybe this behavior 
> could be flagged there? Alternatively, maybe we could document this in a 
> compatibility / porting guide?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28027) Add Bitwise shift left/right

2019-06-12 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-28027:
---

 Summary: Add Bitwise shift left/right
 Key: SPARK-28027
 URL: https://issues.apache.org/jira/browse/SPARK-28027
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.0.0
Reporter: Yuming Wang


||Operator||Description||Example||Result||
|{{<<}}|bitwise shift left|{{1 << 4}}|{{16}}|
|{{>>}}|bitwise shift right|{{8 >> 2}}|{{2}}|

https://www.postgresql.org/docs/11/functions-math.html
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/LanguageElements/Operators/BitwiseOperators.htm
https://docs.aws.amazon.com/redshift/latest/dg/r_OPERATOR_SYMBOLS.html




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27309) CypherSession implementation in spark-cypher

2019-06-12 Thread Martin Junghanns (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Junghanns updated SPARK-27309:
-
Description: Implement CypherSession and RelationalCypherSession in 
spark-cypher module.  (was: Create SparkSession adapter to simply ProperGraph 
creation (Scala implicits? pending design discussion))

> CypherSession implementation in spark-cypher
> 
>
> Key: SPARK-27309
> URL: https://issues.apache.org/jira/browse/SPARK-27309
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> Implement CypherSession and RelationalCypherSession in spark-cypher module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27309) CypherSession implementation in spark-cypher

2019-06-12 Thread Martin Junghanns (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Junghanns updated SPARK-27309:
-
Summary: CypherSession implementation in spark-cypher  (was: PropertyGraph 
SparkSession adapter (Scala/Java))

> CypherSession implementation in spark-cypher
> 
>
> Key: SPARK-27309
> URL: https://issues.apache.org/jira/browse/SPARK-27309
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> Create SparkSession adapter to simply ProperGraph creation (Scala implicits? 
> pending design discussion)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27307) PropertyGraph save/load API (Python)

2019-06-12 Thread Martin Junghanns (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Junghanns updated SPARK-27307:
-
Summary: PropertyGraph save/load API (Python)  (was: PropertyGraph 
save/load (Python))

> PropertyGraph save/load API (Python)
> 
>
> Key: SPARK-27307
> URL: https://issues.apache.org/jira/browse/SPARK-27307
> Project: Spark
>  Issue Type: Story
>  Components: Graph, PySpark
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> Python API for SPARK-27304.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27308) PropertyGraph Cypher query API (Python)

2019-06-12 Thread Martin Junghanns (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Junghanns updated SPARK-27308:
-
Summary: PropertyGraph Cypher query API (Python)  (was: PropertyGraph 
Cypher query (Python))

> PropertyGraph Cypher query API (Python)
> ---
>
> Key: SPARK-27308
> URL: https://issues.apache.org/jira/browse/SPARK-27308
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> Python API for SPARK-27305.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27305) PropertyGraph Cypher query API (Java/Scala)

2019-06-12 Thread Martin Junghanns (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Junghanns updated SPARK-27305:
-
Summary: PropertyGraph Cypher query API (Java/Scala)  (was: PropertyGraph 
Cypher query (Java/Scala))

> PropertyGraph Cypher query API (Java/Scala)
> ---
>
> Key: SPARK-27305
> URL: https://issues.apache.org/jira/browse/SPARK-27305
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> As a user, I can query a PropertyGraph using Cypher language. I learned the 
> syntax from Cypher V9 language reference 
> (https://s3.amazonaws.com/artifacts.opencypher.org/openCypher9.pdf) and in 
> the API doc I can see what features are (not) supported.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27304) PropertyGraph save/load API (Scala/Java)

2019-06-12 Thread Martin Junghanns (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Junghanns updated SPARK-27304:
-
Summary: PropertyGraph save/load API (Scala/Java)  (was: PropertyGraph 
save/load (Scala/Java))

> PropertyGraph save/load API (Scala/Java)
> 
>
> Key: SPARK-27304
> URL: https://issues.apache.org/jira/browse/SPARK-27304
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> As a user, I can save a PropertyGraph after construction to a persist 
> storage. Later I can load it back using PropertyGraph APIs with all future 
> Spark versions.
> Required:
> * Save/load.
> * Scala/Java test suite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27306) PropertyGraph construction API (Python)

2019-06-12 Thread Martin Junghanns (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Junghanns updated SPARK-27306:
-
Summary: PropertyGraph construction API (Python)  (was: PropertyGraph 
construction (Python))

> PropertyGraph construction API (Python)
> ---
>
> Key: SPARK-27306
> URL: https://issues.apache.org/jira/browse/SPARK-27306
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> Python API and test for SPARK-27303.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27303) PropertyGraph construction API (Scala/Java)

2019-06-12 Thread Martin Junghanns (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Junghanns updated SPARK-27303:
-
Summary: PropertyGraph construction API (Scala/Java)  (was: PropertyGraph 
construction (Scala/Java))

> PropertyGraph construction API (Scala/Java)
> ---
>
> Key: SPARK-27303
> URL: https://issues.apache.org/jira/browse/SPARK-27303
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> As a user, I can construct a PropertyGraph and view its nodes and 
> relationships as DataFrames.
> Required:
> * Scala API to construct a PropertyGraph.
> * Scala API to view nodes and relationships as DataFrames.
> * Scala/Java test suites.
> Out of scope:
> * Cypher queries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862065#comment-16862065
 ] 

Jungtaek Lim commented on SPARK-28025:
--

[~gmaas]

Nice finding. Would you like to submit a PR for this? Thanks!

(If you would want to defer to someone, please ping me so that I could take 
this up.)

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27303) PropertyGraph construction (Scala/Java)

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27303:


Assignee: (was: Apache Spark)

> PropertyGraph construction (Scala/Java)
> ---
>
> Key: SPARK-27303
> URL: https://issues.apache.org/jira/browse/SPARK-27303
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> As a user, I can construct a PropertyGraph and view its nodes and 
> relationships as DataFrames.
> Required:
> * Scala API to construct a PropertyGraph.
> * Scala API to view nodes and relationships as DataFrames.
> * Scala/Java test suites.
> Out of scope:
> * Cypher queries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27303) PropertyGraph construction (Scala/Java)

2019-06-12 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27303:


Assignee: Apache Spark

> PropertyGraph construction (Scala/Java)
> ---
>
> Key: SPARK-27303
> URL: https://issues.apache.org/jira/browse/SPARK-27303
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Apache Spark
>Priority: Major
>
> As a user, I can construct a PropertyGraph and view its nodes and 
> relationships as DataFrames.
> Required:
> * Scala API to construct a PropertyGraph.
> * Scala API to view nodes and relationships as DataFrames.
> * Scala/Java test suites.
> Out of scope:
> * Cypher queries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28026) How to get the second row from 1 minute window

2019-06-12 Thread Devendra Vishwakarma (JIRA)
Devendra Vishwakarma created SPARK-28026:


 Summary: How to get the second row from 1 minute window
 Key: SPARK-28026
 URL: https://issues.apache.org/jira/browse/SPARK-28026
 Project: Spark
  Issue Type: Question
  Components: Examples, Structured Streaming
Affects Versions: 2.4.0
Reporter: Devendra Vishwakarma


I am almost blocked for a month I am still figuring out the API to achieve one 
of the functionalities related to spark structured streaming with window 
grouping.  So I thought to reach you guys here.

What I have is stock related time series data and I have grouped them in a 
1-minute window along with the stock name. I am able to get first, last row in 
that 1-minute group, but I want some values from the second row of that 1 
minute window, which I am not able to do at all. I looked at each function 
related to aggregation but I could not find any.Please help me.

This is what I have done so far -

val aggregates = stockEvents
 .withWatermark("timestamp", "5 seconds")
 .groupBy(window($"timestamp", "1 minute", "1 minute", "0 seconds"), 
$"stockName")
 .agg(
 first("tradingprice").alias("open"), //I have to make this value coming from 
second row
 last("tradingprice").alias("close"),
 max("tradingprice").alias("high"),
 min("tradingprice").alias("low"))

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28011) SQL parse error when there are too many aliases in the table

2019-06-12 Thread U Shaw (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

U Shaw resolved SPARK-28011.

Resolution: Invalid

> SQL parse error when there are too many aliases in the table
> 
>
> Key: SPARK-28011
> URL: https://issues.apache.org/jira/browse/SPARK-28011
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: U Shaw
>Priority: Major
>
> A sql syntax error is reported when the following statement is executed.
> ..
> FROM
>   menu_item_categories_tmp t1
>   LEFT JOIN menu_item_categories_tmp t2 ON t1.icat_id = 
> t2.icat_parent_icat_id 
>   AND t1.tenant_id = t2.tenant_id 
>   AND t2.icat_status != 'd'
>   LEFT JOIN menu_item_categories_tmp t3 ON t2.icat_id = 
> t3.icat_parent_icat_id 
>   AND t2.tenant_id = t3.tenant_id 
>   AND t3.icat_status != 'd'
>   LEFT JOIN menu_item_categories_tmp t4 ON t3.icat_id = 
> t4.icat_parent_icat_id 
>   AND t3.tenant_id = t4.tenant_id 
>   AND t4.icat_status != 'd'
>   LEFT JOIN menu_item_categories_tmp t5 ON t4.icat_id = 
> t5.icat_parent_icat_id 
>   AND t4.tenant_id = t5.tenant_id 
>   AND t5.icat_status != 'd'
>   LEFT JOIN menu_item_categories_tmp t6 ON t5.icat_id = 
> t6.icat_parent_icat_id 
>   AND t5.tenant_id = t6.tenant_id 
>   AND t6.icat_status != 'd' 
> WHERE
>   t1.icat_parent_icat_id = '0' 
>   AND t1.icat_status != 'd' 
>   ) SELECT DISTINCT
>   tenant_id AS tenant_id,
>   type AS type,
> CASE
>   
>   WHEN t2.num >= 1 THEN
>   level0 ELSE NULL 
>   END AS level0,
> CASE
>   
>   WHEN t2.num >= 2 THEN
>   level1 ELSE NULL 
>   END AS level1,
> CASE
>   
>   WHEN t2.num >= 3 THEN
>   level2 ELSE NULL 
>   END AS level2,
> CASE
> ..
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-12 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861999#comment-16861999
 ] 

Hyukjin Kwon commented on SPARK-27463:
--

It's easier and safer to find a reference to justify new API and avoid to 
implement a API from scratch.
I think usually our Pandas UDF APIs mimic Pandas' or borrow some idea from 
there (e.g., groupby().apply(...)), and then make it distinct within PySpark.
There are some other examples that works just like other PySpark (or Scala side 
Spark) APIs too (e.g., Windows Pandas UDF).


> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-12 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861995#comment-16861995
 ] 

Chris Martin commented on SPARK-27463:
--

Also my assumption is that the most difficult part of this is extending the udf 
functionality such that multiple DataFrames can be passed as arguments to a 
given udf.  I have a fairly rough design proposal for how this might be 
achieved.  Once this has been refined slightly I'll post it up so that people 
can comment. 

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-12 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861993#comment-16861993
 ] 

Chris Martin commented on SPARK-27463:
--

Hi [~hyukjin.kwon]- I've just started working on the code side of this (as an 
aside I seem unable to assign this Jira to me- do you know how I can do this?). 

Regarding your questions- I don't think there is an analogous API in pandas 
although perhaps [~icexelloss] knows of one.  In terms of comparison to the 
Dataset Cogroup there are obviously a number of similarities but the biggest 
difference is that the Scala version you end up operating on a couple of Scala 
Iterators whereas in this proposal you would operate on a couple of Pandas 
DataFrames.  This means that the Scala version doesn't necessarily need to be 
able to store the entire cogroup in memory, but on the other hand gives you a 
much less rich data structure (a Scala iterator as opposed to a Pandas 
DataFrame).   I think this distinction is basically analogous to that between 
the Python groupby().apply()  and the Scala groupbyKey().flatmapgroups(). In 
each case you end up operating on a data structure which is more in keeping 
with the language at hand. 

 

 

 

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Gerard Maas (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861991#comment-16861991
 ] 

Gerard Maas commented on SPARK-28025:
-

I reproduced  the issue in a  spark-shell session:
{code:java}
 __
/ __/__ ___ _/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.3
/_/

scala> import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming._

scala> val hadoopConf = spark.sparkContext.hadoopConfiguration

scala> import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf

scala> SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key
res1: String = spark.sql.streaming.checkpointFileManagerClass

scala> hadoopConf.getSQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key)
res2: String = null

// mount point for the shared PVC: /storage
scala> val glusterCpfm = new org.apache.hadoop.fs.Path("/storage/crc-store")
glusterCpfm: org.apache.hadoop.fs.Path = /storage/crc-store

scala> val glusterfm = CheckpointFileManager.create(glusterCpfm, hadoopConf)
glusterfm: org.apache.spark.sql.execution.streaming.CheckpointFileManager = 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager@28d00f54

scala> glusterfm.isLocal
res17: Boolean = true

scala> glusterfm.mkdirs(glusterCpfm)

scala> val atomicFile = glusterfm.createAtomic(new 
org.apache.hadoop.fs.Path("/storage/crc-store/file.log"), false)
atomicFile: 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 = 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@1c6e065

scala> atomicFile.writeChars("Hello, World")

scala> atomicFile.close

/**
* Inspect the file system
*
* $ cat file.log
* Hello, World
* $ ls -al
* total 5
* drwxr-sr-x. 2 jboss 2000 85 Jun 12 09:44 .
* drwxrwsr-x. 8 root 2000 4096 Jun 12 09:42 ..
* -rw-r--r--. 1 jboss 2000 12 Jun 12 09:44 
..file.log.c6f90863-77d2-494e-b1cc-0d0ed1344f74.tmp.crc
* -rw-r--r--. 1 jboss 2000 24 Jun 12 09:44 file.log
**/

// Delete the file -- simulate the operation done by the 
HDFSBackedStateStoreProvider#cleanup

scala> glusterfm.delete(new 
org.apache.hadoop.fs.Path("/storage/crc-store/file.log"))

/**
* Inspect the file system -> .crc file left behind
* $ ls -al
* total 9
* drwxr-sr-x. 2 jboss 2000 4096 Jun 12 09:46 .
* drwxrwsr-x. 8 root 2000 4096 Jun 12 09:42 ..
* -rw-r--r--. 1 jboss 2000 12 Jun 12 09:44 
..file.log.c6f90863-77d2-494e-b1cc-0d0ed1344f74.tmp.crc
**/
{code}

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Gerard Maas (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-28025:

Summary: HDFSBackedStateStoreProvider should not leak .crc files   (was: 
HDFSBackedStateStoreProvider leaks .crc files )

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28024) Incorrect numeric values when out of range

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-28024:

Summary: Incorrect numeric values when out of range  (was: Incorrect value 
when out of range)

> Incorrect numeric values when out of range
> --
>
> Key: SPARK-28024
> URL: https://issues.apache.org/jira/browse/SPARK-28024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> For example:
> {code:sql}
> select tinyint(128) * tinyint(2); -- 0
> select smallint(2147483647) * smallint(2); -- -2
> select int(2147483647) * int(2); -- -2
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider leaks .crc files

2019-06-12 Thread Gerard Maas (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861983#comment-16861983
 ] 

Gerard Maas commented on SPARK-28025:
-

Same problem. A different part of the code.

> HDFSBackedStateStoreProvider leaks .crc files 
> --
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28025) HDFSBackedStateStoreProvider leaks .crc files

2019-06-12 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-28025:
---

 Summary: HDFSBackedStateStoreProvider leaks .crc files 
 Key: SPARK-28025
 URL: https://issues.apache.org/jira/browse/SPARK-28025
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.3
 Environment: Spark 2.4.3

Kubernetes 1.11(?) (OpenShift)

StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
`FileContextBasedCheckpointFileManager` : 
{noformat}
scala> glusterfm.isLocal
res17: Boolean = true{noformat}
Reporter: Gerard Maas


The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
is leaving '.crc' files behind. There's a .crc file created for each 
`atomicFile` operation of the CheckpointFileManager.

Over time, the number of files becomes very large. It makes the state store 
file system constantly increase in size and, in our case, deteriorates the file 
system performance.

Here's a sample of one of our spark storage volumes after 2 days of execution 
(4 stateful streaming jobs, each on a different sub-dir):
 # 
{noformat}
Total files in PVC (used for checkpoints and state store)
$find . | wc -l
431796

# .crc files
$find . -name "*.crc" | wc -l
418053{noformat}

With each .crc file taking one storage block, the used storage runs into the 
GBs of data.

These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
shows serious performance deterioration with this large number of files:
{noformat}
DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28024) Incorrect value when out of range

2019-06-12 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-28024:
---

 Summary: Incorrect value when out of range
 Key: SPARK-28024
 URL: https://issues.apache.org/jira/browse/SPARK-28024
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Yuming Wang


For example:
{code:sql}
select tinyint(128) * tinyint(2); -- 0
select smallint(2147483647) * smallint(2); -- -2
select int(2147483647) * int(2); -- -2
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28021) A unappropriate exception in StaticMemoryManager.getMaxExecutionMemory

2019-06-12 Thread JustDoDT (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861957#comment-16861957
 ] 

JustDoDT commented on SPARK-28021:
--

 
 I can't understand this question either.Begging for a big explanation.

> A unappropriate exception in StaticMemoryManager.getMaxExecutionMemory
> --
>
> Key: SPARK-28021
> URL: https://issues.apache.org/jira/browse/SPARK-28021
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.4.3
>Reporter: child2d
>Priority: Minor
>
> When i review StaticMemoryManager.scala, there comes a question to me.
> {code:java}
> private def getMaxExecutionMemory(conf: SparkConf): Long = {
>   val systemMaxMemory = conf.getLong("spark.testing.memory", 
> Runtime.getRuntime.maxMemory)
>   if (systemMaxMemory < MIN_MEMORY_BYTES) {
> throw new IllegalArgumentException(s"System memory $systemMaxMemory must 
> " +
>   s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the 
> --driver-memory " +
>   s"option or spark.driver.memory in Spark configuration.")
>   }
>   if (conf.contains("spark.executor.memory")) {
> val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
> if (executorMemory < MIN_MEMORY_BYTES) {
>   throw new IllegalArgumentException(s"Executor memory $executorMemory 
> must be at least " +
> s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
> s"--executor-memory option or spark.executor.memory in Spark 
> configuration.")
> }
>   }
>   val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
>   val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
>   (systemMaxMemory * memoryFraction * safetyFraction).toLong
> }
> {code}
> When a executor tries to getMaxExecutionMemory, it should set systemMaxMemory 
> by using Runtime.getRuntime.maxMemory first, then compares the value between 
> systemMaxMemory and MIN_MEMORY_BYTES.
> If the compared value is true, program thows an exception to remind user to 
> increase heap size by using --driver-memory.
> I wonder if it is wrong because the heap size of executors are setted by 
> --executor-memory?
> Although there is another exception about adjusting executor's memory below, 
> i just think that the first exception may be not appropriate.
> Thanks for answering my question!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27997) kubernetes client token expired

2019-06-12 Thread Henry Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Yu updated SPARK-27997:
-
Description: 
Hi ,

when I try to submit spark to k8s in cluster mode, I need an authtoken to talk 
with k8s.

unfortunately, many cloud provider provide token and expired with 10-15 mins. 
so we need to fresh this token.  

client mode is event worse, because scheduler is created on submit process.

Should I also make a pr on this ? I fix it by adding 

RotatingOAuthTokenProvider and some configuration.

  was:
Hi ,

when I try to submit spark to k8s in cluster mode, I need an authtoken to talk 
with k8s.

unfortunately, many cloud provider provider token and expired with 10-15 mins. 
so we need to fresh this token.  

client mode is event worse, because scheduler is created on submit process.

Should I also make a pr on this ? I fix it by adding 

RotatingOAuthTokenProvider and some configuration.


> kubernetes client token expired 
> 
>
> Key: SPARK-27997
> URL: https://issues.apache.org/jira/browse/SPARK-27997
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.0.0
>Reporter: Henry Yu
>Priority: Major
>
> Hi ,
> when I try to submit spark to k8s in cluster mode, I need an authtoken to 
> talk with k8s.
> unfortunately, many cloud provider provide token and expired with 10-15 mins. 
> so we need to fresh this token.  
> client mode is event worse, because scheduler is created on submit process.
> Should I also make a pr on this ? I fix it by adding 
> RotatingOAuthTokenProvider and some configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28022) k8s pod affinity achieve cloud native friendly autoscaling

2019-06-12 Thread Henry Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Yu updated SPARK-28022:
-
Description: 
Hi, in order to achieve cloud native friendly autoscaling , I propose to add a 
pod affinity feature.

Traditionally, when we use spark in fix size yarn cluster, it make sense to 
spread containers to every node.

Coming to cloud native resource manage, we want to release node when we don't 
need it any more.

Pod affinity feature counts to place all pods of certain application to some 
nodes instead of all nodes.

By the way,  using pod template is not a good choice, adding application id  to 
pod affinity term when submit is more robust.

 

  was:
Hi, in order to achieve cloud native friendly autoscaling , I propose to add a 
pod affinity feature.

Traditionally, when we use spark in fix size yarn cluster, it make sense to 
spread containers to every node.

Coming to cloud native resource manage, we want to release node when we don't 
need it any more.

Pod affinity feature counts to place all pods of certain application to some 
nodes instead of all nodes.

By the way,  using pod template is not a good choice, for add application id  
to pod affinity term when submit is more robust.

 


> k8s pod affinity achieve cloud native friendly autoscaling 
> ---
>
> Key: SPARK-28022
> URL: https://issues.apache.org/jira/browse/SPARK-28022
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.3
>Reporter: Henry Yu
>Priority: Major
>
> Hi, in order to achieve cloud native friendly autoscaling , I propose to add 
> a pod affinity feature.
> Traditionally, when we use spark in fix size yarn cluster, it make sense to 
> spread containers to every node.
> Coming to cloud native resource manage, we want to release node when we don't 
> need it any more.
> Pod affinity feature counts to place all pods of certain application to some 
> nodes instead of all nodes.
> By the way,  using pod template is not a good choice, adding application id  
> to pod affinity term when submit is more robust.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27923) List all cases that PostgreSQL throws an exception but Spark SQL is NULL

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-27923:

Description: 
In this ticket, we plan to list all cases that PostgreSQL throws an exception 
but Spark SQL is NULL.


When porting the 
[boolean.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/boolean.sql]
 found a case:
# Cast unaccepted value to boolean type throws [invalid input 
syntax|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/boolean.out#L45-L47].

When porting the 
[case.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/case.sql]
 found a case:
 # Division by zero [throws an 
exception|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/case.out#L96-L99].

When porting the 
[date.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/date.sql]
 found a case:
 # Invalid date [throws an 
exception|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/date.out#L13-L14].

When porting the 
[int2.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/int2.sql]
 found a case:
 # Invalid short [throws an 
exception|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/int2.out#L9-L10].

  was:
In this ticket, we plan to list all cases that PostgreSQL throws an exception 
but Spark SQL is NULL.


When porting the 
[boolean.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/boolean.sql]
 found a case:
# Cast unaccepted value to boolean type throws [invalid input 
syntax|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/boolean.out#L45-L47].

When porting the 
[case.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/case.sql]
 found a case:
 # Division by zero [throws an 
exception|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/case.out#L96-L99].

When porting the 
[date.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/date.sql]
 found a case:
 # Invalid date [throws an 
exception|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/date.out#L13-L14].



> List all cases that PostgreSQL throws an exception but Spark SQL is NULL
> 
>
> Key: SPARK-27923
> URL: https://issues.apache.org/jira/browse/SPARK-27923
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> In this ticket, we plan to list all cases that PostgreSQL throws an exception 
> but Spark SQL is NULL.
> When porting the 
> [boolean.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/boolean.sql]
>  found a case:
> # Cast unaccepted value to boolean type throws [invalid input 
> syntax|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/boolean.out#L45-L47].
> When porting the 
> [case.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/case.sql]
>  found a case:
>  # Division by zero [throws an 
> exception|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/case.out#L96-L99].
> When porting the 
> [date.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/date.sql]
>  found a case:
>  # Invalid date [throws an 
> exception|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/date.out#L13-L14].
> When porting the 
> [int2.sql|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/int2.sql]
>  found a case:
>  # Invalid short [throws an 
> exception|https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/int2.out#L9-L10].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28022) k8s pod affinity achieve cloud native friendly autoscaling

2019-06-12 Thread Henry Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Yu updated SPARK-28022:
-
Description: 
Hi, in order to achieve cloud native friendly autoscaling , I propose to add a 
pod affinity feature.

Traditionally, when we use spark in fix size yarn cluster, it make sense to 
spread containers to every node.

Coming to cloud native resource manage, we want to release node when we don't 
need it any more.

Pod affinity feature counts to place all pods of certain application to some 
nodes instead of all nodes.

By the way,  using pod template is not a good choice, for add application id  
to pod affinity term when submit is more robust.

 

  was:
Hi, in order to achieve cloud native friendly autoscaling , I propose to add a 
pod affinity feature.

Traditionally, when we use spark in fix size yarn cluster, it make sense to 
spread containers to every node.

Coming to cloud native resource manage, we want to release node when we don't 
need it any more.

Pod affinity feature counts to place all pods of certain application to some 
nodes instead of all nodes.

By the way, we can not use pod template to achieve this, for we must add 
application id to affinity term .

 


> k8s pod affinity achieve cloud native friendly autoscaling 
> ---
>
> Key: SPARK-28022
> URL: https://issues.apache.org/jira/browse/SPARK-28022
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.3
>Reporter: Henry Yu
>Priority: Major
>
> Hi, in order to achieve cloud native friendly autoscaling , I propose to add 
> a pod affinity feature.
> Traditionally, when we use spark in fix size yarn cluster, it make sense to 
> spread containers to every node.
> Coming to cloud native resource manage, we want to release node when we don't 
> need it any more.
> Pod affinity feature counts to place all pods of certain application to some 
> nodes instead of all nodes.
> By the way,  using pod template is not a good choice, for add application id  
> to pod affinity term when submit is more robust.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28022) k8s pod affinity achieve cloud native friendly autoscaling

2019-06-12 Thread Henry Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Yu updated SPARK-28022:
-
Summary: k8s pod affinity achieve cloud native friendly autoscaling   (was: 
k8s pod affinity achieve cloud native friendly auto scaling )

> k8s pod affinity achieve cloud native friendly autoscaling 
> ---
>
> Key: SPARK-28022
> URL: https://issues.apache.org/jira/browse/SPARK-28022
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.3
>Reporter: Henry Yu
>Priority: Major
>
> Hi, in order to achieve cloud native friendly autoscaling , I propose to add 
> a pod affinity feature.
> Traditionally, when we use spark in fix size yarn cluster, it make sense to 
> spread containers to every node.
> Coming to cloud native resource manage, we want to release node when we don't 
> need it any more.
> Pod affinity feature counts to place all pods of certain application to some 
> nodes instead of all nodes.
> By the way, we can not use pod template to achieve this, for we must add 
> application id to affinity term .
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27997) kubernetes client token expired

2019-06-12 Thread Henry Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Yu updated SPARK-27997:
-
Shepherd: Dongjoon Hyun

> kubernetes client token expired 
> 
>
> Key: SPARK-27997
> URL: https://issues.apache.org/jira/browse/SPARK-27997
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.0.0
>Reporter: Henry Yu
>Priority: Major
>
> Hi ,
> when I try to submit spark to k8s in cluster mode, I need an authtoken to 
> talk with k8s.
> unfortunately, many cloud provider provider token and expired with 10-15 
> mins. so we need to fresh this token.  
> client mode is event worse, because scheduler is created on submit process.
> Should I also make a pr on this ? I fix it by adding 
> RotatingOAuthTokenProvider and some configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28023) Trim the string when cast string type to other types

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-28023:

Description: 
For example:

{code:sql}
select int2 '  21234 ';
SELECT bool '   f   ';
{code}


  was:
For example:

{code:sql}
select int2 '  21234 '
select bool '  21234 '
{code}



> Trim the string when cast string type to other types
> 
>
> Key: SPARK-28023
> URL: https://issues.apache.org/jira/browse/SPARK-28023
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> For example:
> {code:sql}
> select int2 '  21234 ';
> SELECT bool '   f   ';
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28023) Trim the string when cast string type to other types

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-28023:

Description: 
For example:

{code:sql}
SELECT bool '   f   ';
select int2 '  21234 ';
{code}


  was:
For example:

{code:sql}
select int2 '  21234 ';
SELECT bool '   f   ';
{code}



> Trim the string when cast string type to other types
> 
>
> Key: SPARK-28023
> URL: https://issues.apache.org/jira/browse/SPARK-28023
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> For example:
> {code:sql}
> SELECT bool '   f   ';
> select int2 '  21234 ';
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28023) Trim the string when cast string type to other types

2019-06-12 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-28023:

Description: 
For example:

{code:sql}
select int2 '  21234 '
select bool '  21234 '
{code}


> Trim the string when cast string type to other types
> 
>
> Key: SPARK-28023
> URL: https://issues.apache.org/jira/browse/SPARK-28023
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> For example:
> {code:sql}
> select int2 '  21234 '
> select bool '  21234 '
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28022) k8s pod affinity achieve cloud native friendly auto scaling

2019-06-12 Thread Henry Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Yu updated SPARK-28022:
-
Description: 
Hi, in order to achieve cloud native friendly autoscaling , I propose to add a 
pod affinity feature.

Traditionally, when we use spark in fix size yarn cluster, it make sense to 
spread containers to every node.

Coming to cloud native resource manage, we want to release node when we don't 
need it any more.

Pod affinity feature counts to place all pods of certain application to some 
nodes instead of all nodes.

By the way, we can not use pod template to achieve this, for we must add 
application id to affinity term .

 

  was:
Hi, in order to achieve cloud native friendly autoscaling , I propose to add a 
pod affinity feature.

Traditionally, when we use spark in fix size yarn cluster, it make sense to 
spread containers to every node.

Coming to cloud native resource manage, we want to release node when we don't 
need it any more.

pod affinity feature can help when we try to place all pod of certain job to 
some node instead of all node.

we can not use pod template to achieve this, for we must add application id to 
affinity .

 


> k8s pod affinity achieve cloud native friendly auto scaling 
> 
>
> Key: SPARK-28022
> URL: https://issues.apache.org/jira/browse/SPARK-28022
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.3
>Reporter: Henry Yu
>Priority: Major
>
> Hi, in order to achieve cloud native friendly autoscaling , I propose to add 
> a pod affinity feature.
> Traditionally, when we use spark in fix size yarn cluster, it make sense to 
> spread containers to every node.
> Coming to cloud native resource manage, we want to release node when we don't 
> need it any more.
> Pod affinity feature counts to place all pods of certain application to some 
> nodes instead of all nodes.
> By the way, we can not use pod template to achieve this, for we must add 
> application id to affinity term .
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >