[jira] [Created] (SPARK-23506) Add refreshByPath in HiveMetastoreCatalog and invalidByPath in FileStatusCache

2018-02-23 Thread guichaoxian (JIRA)
guichaoxian created SPARK-23506:
---

 Summary: Add refreshByPath in HiveMetastoreCatalog and 
invalidByPath in FileStatusCache
 Key: SPARK-23506
 URL: https://issues.apache.org/jira/browse/SPARK-23506
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.1.0
Reporter: guichaoxian


As far as I know HiveMetastoreCatalog only have a method called 
refreshTable,this method will remove all meta of a table cached in memory.

In some situation, I only change files of a partition. use refreshTable to 
invalid all file status is Inefficient.

I think we can add a method called invalidByPath in FileStatusCache and a 
method refreshByPath in HiveMetastoreCatalog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23448) Dataframe returns wrong result when column don't respect datatype

2018-02-23 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375386#comment-16375386
 ] 

Apache Spark commented on SPARK-23448:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/20666

> Dataframe returns wrong result when column don't respect datatype
> -
>
> Key: SPARK-23448
> URL: https://issues.apache.org/jira/browse/SPARK-23448
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: Local
>Reporter: Ahmed ZAROUI
>Priority: Major
>
> I have the following json file that contains some noisy data(String instead 
> of Array):
>  
> {code:java}
> {"attr1":"val1","attr2":"[\"val2\"]"}
> {"attr1":"val1","attr2":["val2"]}
> {code}
> And i need to specify schema programatically like this:
>  
> {code:java}
> implicit val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .config("spark.ui.enabled", false)
>   .config("spark.sql.caseSensitive", "True")
>   .getOrCreate()
> import spark.implicits._
> val schema = StructType(
>   Seq(StructField("attr1", StringType, true),
>   StructField("attr2", ArrayType(StringType, true), true)))
> spark.read.schema(schema).json(input).collect().foreach(println)
> {code}
> The result given by this code is:
> {code:java}
> [null,null]
> [val1,WrappedArray(val2)]
> {code}
> Instead of putting null in corrupted column, all columns of the first message 
> are null
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20592) Alter table concatenate is not working as expected.

2018-02-23 Thread Arun Manivannan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375305#comment-16375305
 ] 

Arun Manivannan commented on SPARK-20592:
-

x

> Alter table concatenate is not working as expected.
> ---
>
> Key: SPARK-20592
> URL: https://issues.apache.org/jira/browse/SPARK-20592
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: Guru Prabhakar Reddy Marthala
>Priority: Major
>  Labels: hive, pyspark
>
> Created a table using CTAS from csv to parquet.Parquet table generated 
> numerous small files.tried alter table concatenate but it's not working as 
> expected.
> spark.sql("CREATE TABLE flight.flight_data(year INT,   month INT,   day INT,  
>  day_of_week INT,   dep_time INT,   crs_dep_time INT,   arr_time INT,   
> crs_arr_time INT,   unique_carrier STRING,   flight_num INT,   tail_num 
> STRING,   actual_elapsed_time INT,   crs_elapsed_time INT,   air_time INT,   
> arr_delay INT,   dep_delay INT,   origin STRING,   dest STRING,   distance 
> INT,   taxi_in INT,   taxi_out INT,   cancelled INT,   cancellation_code 
> STRING,   diverted INT,   carrier_delay STRING,   weather_delay STRING,   
> nas_delay STRING,   security_delay STRING,   late_aircraft_delay STRING) ROW 
> FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile")
> spark.sql("load data local INPATH 'i:/2008/2008.csv' INTO TABLE 
> flight.flight_data")
> spark.sql("create table flight.flight_data_pq stored as parquet as select * 
> from flight.flight_data")
> spark.sql("create table flight.flight_data_orc stored as orc as select * from 
> flight.flight_data")
> pyspark.sql.utils.ParseException: u'\nOperation not allowed: alter table 
> concatenate(line 1, pos 0)\n\n== SQL ==\nalter table 
> flight_data.flight_data_pq concatenate\n^^^\n'
> Tried on both orc and parquet format.It's not working.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-20592) Alter table concatenate is not working as expected.

2018-02-23 Thread Arun Manivannan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arun Manivannan updated SPARK-20592:

Comment: was deleted

(was: x)

> Alter table concatenate is not working as expected.
> ---
>
> Key: SPARK-20592
> URL: https://issues.apache.org/jira/browse/SPARK-20592
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: Guru Prabhakar Reddy Marthala
>Priority: Major
>  Labels: hive, pyspark
>
> Created a table using CTAS from csv to parquet.Parquet table generated 
> numerous small files.tried alter table concatenate but it's not working as 
> expected.
> spark.sql("CREATE TABLE flight.flight_data(year INT,   month INT,   day INT,  
>  day_of_week INT,   dep_time INT,   crs_dep_time INT,   arr_time INT,   
> crs_arr_time INT,   unique_carrier STRING,   flight_num INT,   tail_num 
> STRING,   actual_elapsed_time INT,   crs_elapsed_time INT,   air_time INT,   
> arr_delay INT,   dep_delay INT,   origin STRING,   dest STRING,   distance 
> INT,   taxi_in INT,   taxi_out INT,   cancelled INT,   cancellation_code 
> STRING,   diverted INT,   carrier_delay STRING,   weather_delay STRING,   
> nas_delay STRING,   security_delay STRING,   late_aircraft_delay STRING) ROW 
> FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile")
> spark.sql("load data local INPATH 'i:/2008/2008.csv' INTO TABLE 
> flight.flight_data")
> spark.sql("create table flight.flight_data_pq stored as parquet as select * 
> from flight.flight_data")
> spark.sql("create table flight.flight_data_orc stored as orc as select * from 
> flight.flight_data")
> pyspark.sql.utils.ParseException: u'\nOperation not allowed: alter table 
> concatenate(line 1, pos 0)\n\n== SQL ==\nalter table 
> flight_data.flight_data_pq concatenate\n^^^\n'
> Tried on both orc and parquet format.It's not working.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23475) The "stages" page doesn't show any completed stages

2018-02-23 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375254#comment-16375254
 ] 

Marcelo Vanzin commented on SPARK-23475:


https://github.com/apache/spark/pull/20662 was merged to 2.3, but not in time 
for rc5, so probably will only be in 2.3.1.

> The "stages" page doesn't show any completed stages
> ---
>
> Key: SPARK-23475
> URL: https://issues.apache.org/jira/browse/SPARK-23475
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
> Fix For: 2.3.0
>
> Attachments: Screen Shot 2018-02-21 at 12.39.39 AM.png
>
>
> Run "bin/spark-shell --conf spark.ui.retainedJobs=10 --conf 
> spark.ui.retainedStages=10", type the following codes and click the "stages" 
> page, it will not show completed stages:
> {code}
> val rdd = sc.parallelize(0 to 100, 100).repartition(10).cache()
> (1 to 20).foreach { i =>
>rdd.repartition(10).count()
> }
> {code}
> Please see the attached screenshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23505) Flaky test: ParquetQuerySuite

2018-02-23 Thread Marcelo Vanzin (JIRA)
Marcelo Vanzin created SPARK-23505:
--

 Summary: Flaky test: ParquetQuerySuite
 Key: SPARK-23505
 URL: https://issues.apache.org/jira/browse/SPARK-23505
 Project: Spark
  Issue Type: Bug
  Components: SQL, Tests
Affects Versions: 2.4.0
Reporter: Marcelo Vanzin


Seen on an unrelated PR;
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/

{noformat}
Error Message
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 15 times over 10.01253324699 
seconds. Last failure message: There are 1 possibly leaked file streams..
Stacktrace
sbt.ForkMain$ForkError: 
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 15 times over 10.01253324699 
seconds. Last failure message: There are 1 possibly leaked file streams..
at 
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.eventually(ParquetQuerySuite.scala:41)
at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.eventually(ParquetQuerySuite.scala:41)
at 
org.apache.spark.sql.test.SharedSparkSession$class.afterEach(SharedSparkSession.scala:114)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.afterEach(ParquetQuerySuite.scala:41)
at 
org.scalatest.BeforeAndAfterEach$$anonfun$1.apply$mcV$sp(BeforeAndAfterEach.scala:234)
at 
org.scalatest.Status$$anonfun$withAfterEffect$1.apply(Status.scala:379)
at 
org.scalatest.Status$$anonfun$withAfterEffect$1.apply(Status.scala:375)
at org.scalatest.SucceededStatus$.whenCompleted(Status.scala:454)
at org.scalatest.Status$class.withAfterEffect(Status.scala:375)
at org.scalatest.SucceededStatus$.withAfterEffect(Status.scala:426)
at 
org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:232)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.runTest(ParquetQuerySuite.scala:41)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at 
org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
at sbt.ForkMain$Run$2.call(ForkMain.java:296)
at sbt.ForkMain$Run$2.call(ForkMain.java:286)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: sbt.ForkMain$ForkError: java.lang.IllegalStateException: There are 1 
possibly leaked file streams.
at 
org.apache.spark.DebugFilesystem$.assertNoOpenStreams(DebugFilesystem.scala:54)

[jira] [Created] (SPARK-23504) Flaky test: RateSourceV2Suite.basic microbatch execution

2018-02-23 Thread Marcelo Vanzin (JIRA)
Marcelo Vanzin created SPARK-23504:
--

 Summary: Flaky test: RateSourceV2Suite.basic microbatch execution
 Key: SPARK-23504
 URL: https://issues.apache.org/jira/browse/SPARK-23504
 Project: Spark
  Issue Type: Bug
  Components: SQL, Tests
Affects Versions: 2.4.0
Reporter: Marcelo Vanzin


Seen on an unrelated change:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport/org.apache.spark.sql.execution.streaming/RateSourceV2Suite/basic_microbatch_execution/

{noformat}
Error Message
org.scalatest.exceptions.TestFailedException:   == Results == !== Correct 
Answer - 10 == == Spark Answer - 0 == !struct<_1:timestamp,_2:int>   
struct<> ![1969-12-31 16:00:00.0,0]  ![1969-12-31 16:00:00.1,1]  
![1969-12-31 16:00:00.2,2]  ![1969-12-31 16:00:00.3,3]  ![1969-12-31 
16:00:00.4,4]  ![1969-12-31 16:00:00.5,5]  ![1969-12-31 16:00:00.6,6]   
   ![1969-12-31 16:00:00.7,7]  ![1969-12-31 16:00:00.8,8]  ![1969-12-31 
16:00:00.9,9]== Progress ==AdvanceRateManualClock(1) => 
CheckLastBatch: [1969-12-31 16:00:00.0,0],[1969-12-31 16:00:00.1,1],[1969-12-31 
16:00:00.2,2],[1969-12-31 16:00:00.3,3],[1969-12-31 16:00:00.4,4],[1969-12-31 
16:00:00.5,5],[1969-12-31 16:00:00.6,6],[1969-12-31 16:00:00.7,7],[1969-12-31 
16:00:00.8,8],[1969-12-31 16:00:00.9,9]StopStream
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@22bc97a,Map(),null)
AdvanceRateManualClock(2)CheckLastBatch: [1969-12-31 
16:00:01.0,10],[1969-12-31 16:00:01.1,11],[1969-12-31 
16:00:01.2,12],[1969-12-31 16:00:01.3,13],[1969-12-31 
16:00:01.4,14],[1969-12-31 16:00:01.5,15],[1969-12-31 
16:00:01.6,16],[1969-12-31 16:00:01.7,17],[1969-12-31 
16:00:01.8,18],[1969-12-31 16:00:01.9,19]  == Stream == Output Mode: Append 
Stream state: 
{org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader@75b88292:
 {"0":{"value":-1,"runTimeMs":0}}} Thread state: alive Thread stack trace: 
sun.misc.Unsafe.park(Native Method) 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) 
org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633) 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2030) 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:84)
 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) 
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
 org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) 
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) 
org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
 org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) 
org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$18.apply(MicroBatchExecution.scala:493)
 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:488)
 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 

[jira] [Resolved] (SPARK-23459) Improve the error message when unknown column is specified in partition columns

2018-02-23 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-23459.
-
   Resolution: Fixed
 Assignee: Kazuaki Ishizaki
Fix Version/s: 2.4.0

> Improve the error message when unknown column is specified in partition 
> columns
> ---
>
> Key: SPARK-23459
> URL: https://issues.apache.org/jira/browse/SPARK-23459
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Kazuaki Ishizaki
>Priority: Major
>  Labels: starter
> Fix For: 2.4.0
>
>
> {noformat}
>   test("save with an unknown partition column") {
> withTempDir { dir =>
>   val path = dir.getCanonicalPath
> Seq(1L -> "a").toDF("i", "j").write
>   .format("parquet")
>   .partitionBy("unknownColumn")
>   .save(path)
> }
>   }
> {noformat}
> We got the following error message:
> {noformat}
> Partition column unknownColumn not found in schema 
> StructType(StructField(i,LongType,false), StructField(j,StringType,true));
> {noformat}
> We should not call toString, but catalogString in the function 
> `partitionColumnsSchema` of `PartitioningUtils.scala`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21740) DataFrame.write does not work with Phoenix JDBC Driver

2018-02-23 Thread Dheeren Beborrtha (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375134#comment-16375134
 ] 

Dheeren Beborrtha commented on SPARK-21740:
---

What workaround did you use? DId you modify Spark-core or modify phoenix jdbc 
client?

> DataFrame.write does not work with Phoenix JDBC Driver
> --
>
> Key: SPARK-21740
> URL: https://issues.apache.org/jira/browse/SPARK-21740
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.2.0
>Reporter: Paul Wu
>Priority: Major
>  Labels: jdbc, phoenix
>
>   The reason for this is that Phoenix JDBC driver does not support "INSERT", 
> but "UPSERT".
> Exception for the following program:
> 17/08/15 12:18:53 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> org.apache.phoenix.exception.PhoenixParserException: ERROR 601 (42P00): 
> Syntax error. Encountered "INSERT" at line 1, column 1.
>   at 
> org.apache.phoenix.exception.PhoenixParserException.newException(PhoenixParserException.java:33)
> {code:java}
> public class HbaseJDBCSpark {
> private static final SparkSession sparkSession
> = SparkSession.builder()
> .config("spark.sql.warehouse.dir", "file:///temp")
> .config("spark.driver.memory", "5g")
> .master("local[*]").appName("Spark2JdbcDs").getOrCreate();
> static final String JDBC_URL
> = "jdbc:phoenix:somehost:2181:/hbase-unsecure";
> public static void main(String[] args) {
> final Properties connectionProperties = new Properties();
> Dataset jdbcDF
> = sparkSession.read()
> .jdbc(JDBC_URL, "javatest", connectionProperties);
> jdbcDF.show();
> String url = JDBC_URL;
> Properties p = new Properties();
> p.put("driver", "org.apache.phoenix.jdbc.PhoenixDriver");
> //p.put("batchsize", "10");
> jdbcDF.write().mode(SaveMode.Append).jdbc(url, "javatest", p);
> sparkSession.close();
> }
> // Create variables
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23502) Support async init of spark context during spark-shell startup

2018-02-23 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375119#comment-16375119
 ] 

Sean Owen commented on SPARK-23502:
---

It does introduce some new cases to deal with, like, what happens when you 
operate on {{sc}} before it's initialized? but it could have some value in 
letting people get started and feel more responsive. Is it surprising if Spark 
shell starts but errors out 20 seconds later? and yeah you might lose some 
useful output that's available when blocking startup until things are ready.

For the common case that this is a problem, where the cluster has to take a 
long time to fulfill the request for many executors, dynamic allocation can 
help.

> Support async init of spark context during spark-shell startup
> --
>
> Key: SPARK-23502
> URL: https://issues.apache.org/jira/browse/SPARK-23502
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>Priority: Minor
>
> Currently, whenever a user starts the spark shell, we initialize the spark 
> context before returning the prompt to the user. In environments, where spark 
> context initialization takes several seconds, it is not a very good user 
> experience for the user to wait for the prompt. Instead of waiting for the 
> initialization of spark context, we can initialize it in the background while 
> we return the prompt to the user as soon as possible. Please note that even 
> if we return the prompt to the user soon, we still need to make sure to wait 
> for the spark context initialization to complete before any query is 
> executed. 
> Please note that the scala interpreter already does very similar async 
> initialization in order to return the prompt to the user faster - 
> https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414.
>  We will be emulating the behavior for Spark. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23502) Support async init of spark context during spark-shell startup

2018-02-23 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375051#comment-16375051
 ] 

Sital Kedia commented on SPARK-23502:
-

I realized that we are printing the web url link and the application id during 
the spark-shell startup 
(https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414).
 If we do asynchronous initialization of spark context, those info will not be 
available during the startup so we won't be able to print them. 

[~r...@databricks.com], [~srowen]  - What do you think about async 
initialization of spark context and getting rid of the web url link and 
application id that is printed during the startup?

 

 

> Support async init of spark context during spark-shell startup
> --
>
> Key: SPARK-23502
> URL: https://issues.apache.org/jira/browse/SPARK-23502
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>Priority: Minor
>
> Currently, whenever a user starts the spark shell, we initialize the spark 
> context before returning the prompt to the user. In environments, where spark 
> context initialization takes several seconds, it is not a very good user 
> experience for the user to wait for the prompt. Instead of waiting for the 
> initialization of spark context, we can initialize it in the background while 
> we return the prompt to the user as soon as possible. Please note that even 
> if we return the prompt to the user soon, we still need to make sure to wait 
> for the spark context initialization to complete before any query is 
> executed. 
> Please note that the scala interpreter already does very similar async 
> initialization in order to return the prompt to the user faster - 
> https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414.
>  We will be emulating the behavior for Spark. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375005#comment-16375005
 ] 

Imran Rashid commented on SPARK-23485:
--

{quote}
I think this is because the general expectation is that a failure on a given 
node will just cause new executors to spin up on different nodes and eventually 
the application will succeed.
{quote}

I think this is the part which may be particularly different in spark.  Some 
types of failures do not cause the executor to die -- its just a task failure, 
and the executor itself is still alive.  As long as Spark gets heartbeats from 
the executor, it figures its still fine.  But a bad disk can cause *tasks* to 
repeatedly fail.  Could be true for other resources, eg. a bad gpu, and maybe 
the gpu is only used by certain tasks.

When that happens, without spark's internal blacklisting, an application will 
very quickly hit many task failures.  The task fails, spark notices that, tries 
to find a place to assign the failed task, puts it back in the same place; 
repeat till spark decides there are too many failures and gives up.  It can 
easily cause your app to fail in ~1 second.  There is no communication with the 
cluster manager through this process, its all just between the spark's driver & 
executor.  In one case, when this happened yarn's own health checker discovered 
the problem a few mins after it occurred -- but the spark app had already 
failed by that point.  From one bad disk in a cluster w/ > 1000 disks.

Spark's blacklisting is really meant to be complementary to the type of node 
health checks you are talking about in kubernetes.  The blacklisting in spark 
intentionally does not try to figure out the root cause of the problem, as we 
don't want to get into the game of enumerating all of the possibilities.  Its a 
heuristic which makes it safe for spark to keep going in case of these 
un-caught errors, but then retries the resources when it would be safe to do 
so. (discussed in more detail in the design doc on SPARK-8425.)

anti-affinity in kubernetes may be just the trick, though this part of the doc 
was a little worrisome:

{quote}
Note: Inter-pod affinity and anti-affinity require substantial amount of 
processing which can slow down scheduling in large clusters significantly. We 
do not recommend using them in clusters larger than several hundred nodes.
{quote}

Blacklisting is *most* important in large clusters.  It seems like its able to 
do something much more complicated than a simple node blacklist, though -- 
maybe it would already be faster with such a simple anti-affinity rule?

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23408) Flaky test: StreamingOuterJoinSuite.left outer early state exclusion on right

2018-02-23 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-23408:
--
Fix Version/s: 2.4.0

> Flaky test: StreamingOuterJoinSuite.left outer early state exclusion on right
> -
>
> Key: SPARK-23408
> URL: https://issues.apache.org/jira/browse/SPARK-23408
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 2.4.0
>Reporter: Marcelo Vanzin
>Assignee: Tathagata Das
>Priority: Minor
> Fix For: 2.4.0, 3.0.0
>
>
> Seen on an unrelated PR.
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87386/testReport/org.apache.spark.sql.streaming/StreamingOuterJoinSuite/left_outer_early_state_exclusion_on_right/
> {noformat}
> sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
> Assert on query failed: Check total state rows = List(4), updated state rows 
> = List(4): Array(1) did not equal List(4) incorrect updates rows
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
>   org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:28)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:23)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1$$anonfun$apply$14.apply$mcZ$sp(StreamTest.scala:568)
>   
> org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:371)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:568)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:432)
>   
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> == Progress ==
>AddData to MemoryStream[value#19652]: 3,4,5
>AddData to MemoryStream[value#19662]: 1,2,3
>CheckLastBatch: [3,10,6,9]
> => AssertOnQuery(, Check total state rows = List(4), updated state 
> rows = List(4))
>AddData to MemoryStream[value#19652]: 20
>AddData to MemoryStream[value#19662]: 21
>CheckLastBatch: 
>AddData to MemoryStream[value#19662]: 20
>CheckLastBatch: [20,30,40,60],[4,10,8,null],[5,10,10,null]
> == Stream ==
> Output Mode: Append
> Stream state: {MemoryStream[value#19652]: 0,MemoryStream[value#19662]: 0}
> Thread state: alive
> Thread stack trace: java.lang.Thread.sleep(Native Method)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:152)
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:120)
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> {noformat}
> No other failures in the history, though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23408) Flaky test: StreamingOuterJoinSuite.left outer early state exclusion on right

2018-02-23 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-23408:
-

Assignee: Tathagata Das

> Flaky test: StreamingOuterJoinSuite.left outer early state exclusion on right
> -
>
> Key: SPARK-23408
> URL: https://issues.apache.org/jira/browse/SPARK-23408
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 2.4.0
>Reporter: Marcelo Vanzin
>Assignee: Tathagata Das
>Priority: Minor
> Fix For: 3.0.0
>
>
> Seen on an unrelated PR.
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87386/testReport/org.apache.spark.sql.streaming/StreamingOuterJoinSuite/left_outer_early_state_exclusion_on_right/
> {noformat}
> sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
> Assert on query failed: Check total state rows = List(4), updated state rows 
> = List(4): Array(1) did not equal List(4) incorrect updates rows
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
>   org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:28)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:23)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1$$anonfun$apply$14.apply$mcZ$sp(StreamTest.scala:568)
>   
> org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:371)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:568)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:432)
>   
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> == Progress ==
>AddData to MemoryStream[value#19652]: 3,4,5
>AddData to MemoryStream[value#19662]: 1,2,3
>CheckLastBatch: [3,10,6,9]
> => AssertOnQuery(, Check total state rows = List(4), updated state 
> rows = List(4))
>AddData to MemoryStream[value#19652]: 20
>AddData to MemoryStream[value#19662]: 21
>CheckLastBatch: 
>AddData to MemoryStream[value#19662]: 20
>CheckLastBatch: [20,30,40,60],[4,10,8,null],[5,10,10,null]
> == Stream ==
> Output Mode: Append
> Stream state: {MemoryStream[value#19652]: 0,MemoryStream[value#19662]: 0}
> Thread state: alive
> Thread stack trace: java.lang.Thread.sleep(Native Method)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:152)
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:120)
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> {noformat}
> No other failures in the history, though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23408) Flaky test: StreamingOuterJoinSuite.left outer early state exclusion on right

2018-02-23 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-23408.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 20650
[https://github.com/apache/spark/pull/20650]

> Flaky test: StreamingOuterJoinSuite.left outer early state exclusion on right
> -
>
> Key: SPARK-23408
> URL: https://issues.apache.org/jira/browse/SPARK-23408
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 2.4.0
>Reporter: Marcelo Vanzin
>Assignee: Tathagata Das
>Priority: Minor
> Fix For: 3.0.0
>
>
> Seen on an unrelated PR.
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87386/testReport/org.apache.spark.sql.streaming/StreamingOuterJoinSuite/left_outer_early_state_exclusion_on_right/
> {noformat}
> sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
> Assert on query failed: Check total state rows = List(4), updated state rows 
> = List(4): Array(1) did not equal List(4) incorrect updates rows
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
>   org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:28)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:23)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1$$anonfun$apply$14.apply$mcZ$sp(StreamTest.scala:568)
>   
> org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:371)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:568)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:432)
>   
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> == Progress ==
>AddData to MemoryStream[value#19652]: 3,4,5
>AddData to MemoryStream[value#19662]: 1,2,3
>CheckLastBatch: [3,10,6,9]
> => AssertOnQuery(, Check total state rows = List(4), updated state 
> rows = List(4))
>AddData to MemoryStream[value#19652]: 20
>AddData to MemoryStream[value#19662]: 21
>CheckLastBatch: 
>AddData to MemoryStream[value#19662]: 20
>CheckLastBatch: [20,30,40,60],[4,10,8,null],[5,10,10,null]
> == Stream ==
> Output Mode: Append
> Stream state: {MemoryStream[value#19652]: 0,MemoryStream[value#19662]: 0}
> Thread state: alive
> Thread stack trace: java.lang.Thread.sleep(Native Method)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:152)
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:120)
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> {noformat}
> No other failures in the history, though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Anirudh Ramanathan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374884#comment-16374884
 ] 

Anirudh Ramanathan edited comment on SPARK-23485 at 2/23/18 7:36 PM:
-

Stavros - we [do currently 
differentiate|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L386-L398]
 between kubernetes causing an executor to disappear (node failure) and exit 
caused by the application itself. 

Here's some detail on node issues and k8s:

The node level problem detection is split between the Kubelet and the [Node 
Problem Detector|https://github.com/kubernetes/node-problem-detector]. This 
works for some common errors and in future, will taint nodes upon detecting 
them. Some of these errors are listed 
[here|https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json#L30:15].
 However, there are some categories of errors this setup won't detect. For 
example: if we have a node that has firewall rules/networking that prevents an 
executor running on it accessing a particular external service, to say - 
download/stream data. Or, a node with issues in its local disk which makes a 
spark executor on it throw read/write errors. These error conditions may only 
affect certain kinds of pods on that node and not others.

Yinan's point I think is that it is uncommon for applications on k8s to try and 
incorporate reasoning about node level conditions. I think this is because the 
general expectation is that a failure on a given node will just cause new 
executors to spin up on different nodes and eventually the application will 
succeed. However, I can see this being an issue in large-scale production 
deployments, where we'd see transient errors like above. Given the existence of 
a blacklist mechanism and anti-affinity primitives, it wouldn't be too complex 
to incorporate it I think. 

[~aash] [~mcheah], have you guys seen this in practice thus far? 


was (Author: foxish):
Stavros - we [do currently 
differentiate|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L386-L398]
 between kubernetes causing an executor to disappear (node failure) and exit 
caused by the application itself. 

Here's some detail on node issues and k8s:

The node level problem detection is split between the Kubelet and the [Node 
Problem Detector|https://github.com/kubernetes/node-problem-detector]. This 
works for some common errors and in future, will taint nodes upon detecting 
them. Some of these errors are listed 
[here|https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json#L30:15].
 However, there are some categories of errors this setup won't detect. For 
example: if we have a node that has firewall rules/networking that prevents it 
from accessing a particular external service, to say - download/stream data. 
Or, a node with issues in its local disk which makes it throw read/write 
errors. These error conditions may only affect certain kinds of pods on that 
node and not others.

Yinan's point I think is that it is uncommon for applications on k8s to try and 
incorporate reasoning about node level conditions. I think this is because the 
general expectation is that a failure on a given node will just cause new 
executors to spin up on different nodes and eventually the application will 
succeed. However, I can see this being an issue in large-scale production 
deployments, where we'd see transient errors like above. Given the existence of 
a blacklist mechanism and anti-affinity primitives, it wouldn't be too complex 
to incorporate it I think. 

[~aash] [~mcheah], have you guys seen this in practice thus far? 

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in 

[jira] [Commented] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Anirudh Ramanathan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374884#comment-16374884
 ] 

Anirudh Ramanathan commented on SPARK-23485:


Stavros - we [do currently 
differentiate|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L386-L398]
 between kubernetes causing an executor to disappear (node failure) and exit 
caused by the application itself. 

Here's some detail on node issues and k8s:

The node level problem detection is split between the Kubelet and the [Node 
Problem Detector|https://github.com/kubernetes/node-problem-detector]. This 
works for some common errors and in future, will taint nodes upon detecting 
them. Some of these errors are listed 
[here|https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json#L30:15].
 However, there are some categories of errors this setup won't detect. For 
example: if we have a node that has firewall rules/networking that prevents it 
from accessing a particular external service, to say - download/stream data. 
Or, a node with issues in its local disk which makes it throw read/write 
errors. These error conditions may only affect certain kinds of pods on that 
node and not others.

Yinan's point I think is that it is uncommon for applications on k8s to try and 
incorporate reasoning about node level conditions. I think this is because the 
general expectation is that a failure on a given node will just cause new 
executors to spin up on different nodes and eventually the application will 
succeed. However, I can see this being an issue in large-scale production 
deployments, where we'd see transient errors like above. Given the existence of 
a blacklist mechanism and anti-affinity primitives, it wouldn't be too complex 
to incorporate it I think. 

[~aash] [~mcheah], have you guys seen this in practice thus far? 

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-23 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374872#comment-16374872
 ] 

Sönke Liebau commented on SPARK-18057:
--

Alright.
I've got a 12h flight to SF ahead of me on Sunday, will put this on the "things 
to keep me from dying of boredom" list and have a look at it.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23503) continuous execution should sequence committed epochs

2018-02-23 Thread Jose Torres (JIRA)
Jose Torres created SPARK-23503:
---

 Summary: continuous execution should sequence committed epochs
 Key: SPARK-23503
 URL: https://issues.apache.org/jira/browse/SPARK-23503
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Jose Torres


Currently, the EpochCoordinator doesn't enforce a commit order. If a message 
for epoch n gets lost in the ether, and epoch n + 1 happens to be ready for 
commit earlier, epoch n + 1 will be committed.

 

This is either incorrect or needlessly confusing, because it's not safe to 
start from the end offset of epoch n + 1 until epoch n is committed. 
EpochCoordinator should enforce this sequencing.

 

Note that this is not actually a problem right now, because the commit messages 
go through the same RPC channel from the same place. But we shouldn't 
implicitly bake this assumption in.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23502) Support async init of spark context during spark-shell startup

2018-02-23 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-23502:
---

 Summary: Support async init of spark context during spark-shell 
startup
 Key: SPARK-23502
 URL: https://issues.apache.org/jira/browse/SPARK-23502
 Project: Spark
  Issue Type: Improvement
  Components: Spark Shell
Affects Versions: 2.0.0
Reporter: Sital Kedia


Currently, whenever a user starts the spark shell, we initialize the spark 
context before returning the prompt to the user. In environments, where spark 
context initialization takes several seconds, it is not a very good user 
experience for the user to wait for the prompt. Instead of waiting for the 
initialization of spark context, we can initialize it in the background while 
we return the prompt to the user as soon as possible. Please note that even if 
we return the prompt to the user soon, we still need to make sure to wait for 
the spark context initialization to complete before any query is executed. 

Please note that the scala interpreter already does very similar async 
initialization in order to return the prompt to the user faster - 
https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414.
 We will be emulating the behavior for Spark. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374778#comment-16374778
 ] 

Stavros Kontopoulos edited comment on SPARK-23485 at 2/23/18 6:59 PM:
--

How about locality preferences + a hardware problem, like the disk problem? I 
see code in Spark Kubernetes scheduler related to locality (not sure if it is 
completed). Will that problem be detected and will kubernetes scheduler 
consider the node as problematic? If so then I guess there is no need for 
blacklisting in such scenarios. If though, this cannot be detected and the task 
is failing but there is locality preference what will happen? Kubernetes should 
not just re-run things elsewhere just because there was a failure. The reason 
for a failure matters. Is that an app failure or something lower level. (I am 
new to kubernetes, so I missed the fact that taints is applied cluster wide, we 
just need some similar feature as already mentioned node anti-affinity).

 


was (Author: skonto):
How about locality preferences + a hardware problem, like the disk problem? I 
see code in Spark Kubernetes scheduler related to locality (not sure if it is 
completed). Will that problem be detected and will kubernetes scheduler 
consider the node as problematic? If so then I guess there is no need for 
blacklisting in such scenarios. If though, this cannot be detected and the task 
is failing but there is locality preference what will happen? Kubernetes should 
not just re-run things elsewhere just because there was a failure. The reason 
for a failure matters. Is that an app failure or something lower level. 

 

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5377) Dynamically add jar into Spark Driver's classpath.

2018-02-23 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374841#comment-16374841
 ] 

Xuefu Zhang commented on SPARK-5377:


[~shay_elbaz] I think the issue was closed purely because no one was working on 
this, based on my private communication with [~sowen]. However, this can surely 
be reopened if someone likes to work on this.

> Dynamically add jar into Spark Driver's classpath.
> --
>
> Key: SPARK-5377
> URL: https://issues.apache.org/jira/browse/SPARK-5377
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Chengxiang Li
>Priority: Major
>
> Spark support dynamically add jar to executor classpath through 
> SparkContext::addJar(), while it does not support dynamically add jar into 
> driver classpath. In most case(if not all the case), user dynamically add jar 
> with SparkContext::addJar()  because some classes from the jar would be 
> referred in upcoming Spark job, which means the classes need to be loaded in 
> Spark driver side either,e.g during serialization. I think it make sense to 
> add an API to add jar into driver classpath, or just make it available in 
> SparkContext::addJar(). HIVE-9410 is a real case from Hive on Spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23471) RandomForestClassificationModel save() - incorrect metadata

2018-02-23 Thread Keepun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374830#comment-16374830
 ] 

Keepun commented on SPARK-23471:


Saved to AWS s3://

> RandomForestClassificationModel save() - incorrect metadata
> ---
>
> Key: SPARK-23471
> URL: https://issues.apache.org/jira/browse/SPARK-23471
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Keepun
>Priority: Major
>
> RandomForestClassificationMode.load() does not work after save() 
> {code:java}
> RandomForestClassifier rf = new RandomForestClassifier()
> .setFeaturesCol("features")
> .setLabelCol("result")
> .setNumTrees(100)
> .setMaxDepth(30)
> .setMinInstancesPerNode(1)
> //.setCacheNodeIds(true)
> .setMaxMemoryInMB(500)
> .setSeed(System.currentTimeMillis() + System.nanoTime());
> RandomForestClassificationModel rfmodel = rf.train(data);
>try {
>   rfmodel.save(args[2] + "." + System.currentTimeMillis());
>} catch (IOException e) {
>   LOG.error(e.getMessage(), e);
>   e.printStackTrace();
>}
> {code}
> File metadata\part-0: 
> {code:java}
> {"class":"org.apache.spark.ml.classification.RandomForestClassificationModel",
> "timestamp":1519136783983,"sparkVersion":"2.2.1","uid":"rfc_7c7e84ce7488",
> "paramMap":{"featureSubsetStrategy":"auto","cacheNodeIds":false,"impurity":"gini",
> "checkpointInterval":10,
> "numTrees":20,"maxDepth":5,
> "probabilityCol":"probability","labelCol":"label","featuresCol":"features",
> "maxMemoryInMB":256,"minInstancesPerNode":1,"subsamplingRate":1.0,
> "rawPredictionCol":"rawPrediction","predictionCol":"prediction","maxBins":32,
> "minInfoGain":0.0,"seed":-491520797},"numFeatures":1354,"numClasses":2,
> "numTrees":20}
> {code}
> should be:
> {code:java}
> "numTrees":100,"maxDepth":30,{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23471) RandomForestClassificationModel save() - incorrect metadata

2018-02-23 Thread Keepun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keepun updated SPARK-23471:
---
Description: 
RandomForestClassificationMode.load() does not work after save() 
{code:java}
RandomForestClassifier rf = new RandomForestClassifier()
.setFeaturesCol("features")
.setLabelCol("result")
.setNumTrees(100)
.setMaxDepth(30)
.setMinInstancesPerNode(1)
//.setCacheNodeIds(true)
.setMaxMemoryInMB(500)
.setSeed(System.currentTimeMillis() + System.nanoTime());
RandomForestClassificationModel rfmodel = rf.train(data);
   try {
  rfmodel.save(args[2] + "." + System.currentTimeMillis());
   } catch (IOException e) {
  LOG.error(e.getMessage(), e);
  e.printStackTrace();
   }
{code}
File metadata\part-0: 
{code:java}
{"class":"org.apache.spark.ml.classification.RandomForestClassificationModel",
"timestamp":1519136783983,"sparkVersion":"2.2.1","uid":"rfc_7c7e84ce7488",
"paramMap":{"featureSubsetStrategy":"auto","cacheNodeIds":false,"impurity":"gini",
"checkpointInterval":10,

"numTrees":20,"maxDepth":5,

"probabilityCol":"probability","labelCol":"label","featuresCol":"features",
"maxMemoryInMB":256,"minInstancesPerNode":1,"subsamplingRate":1.0,
"rawPredictionCol":"rawPrediction","predictionCol":"prediction","maxBins":32,
"minInfoGain":0.0,"seed":-491520797},"numFeatures":1354,"numClasses":2,

"numTrees":20}
{code}
should be:
{code:java}
"numTrees":100,"maxDepth":30,{code}
 

  was:
RandomForestClassificationMode.load() does not work after save():

 
{code:java}
RandomForestClassifier rf = new RandomForestClassifier()
.setFeaturesCol("features")
.setLabelCol("result")
.setNumTrees(100)
.setMaxDepth(30)
.setMinInstancesPerNode(1)
//.setCacheNodeIds(true)
.setMaxMemoryInMB(500)
.setSeed(System.currentTimeMillis() + System.nanoTime());
RandomForestClassificationModel rfmodel = rf.train(data);
   try {
  rfmodel.save(args[2] + "." + System.currentTimeMillis());
   } catch (IOException e) {
  LOG.error(e.getMessage(), e);
  e.printStackTrace();
   }
{code}
File metadata\part-0:

 

 
{code:java}
{"class":"org.apache.spark.ml.classification.RandomForestClassificationModel",
"timestamp":1519136783983,"sparkVersion":"2.2.1","uid":"rfc_7c7e84ce7488",
"paramMap":{"featureSubsetStrategy":"auto","cacheNodeIds":false,"impurity":"gini",
"checkpointInterval":10,

"numTrees":20,"maxDepth":5,

"probabilityCol":"probability","labelCol":"label","featuresCol":"features",
"maxMemoryInMB":256,"minInstancesPerNode":1,"subsamplingRate":1.0,
"rawPredictionCol":"rawPrediction","predictionCol":"prediction","maxBins":32,
"minInfoGain":0.0,"seed":-491520797},"numFeatures":1354,"numClasses":2,

"numTrees":20}
{code}
should be:
{code:java}
"numTrees":100,"maxDepth":30,{code}
 


> RandomForestClassificationModel save() - incorrect metadata
> ---
>
> Key: SPARK-23471
> URL: https://issues.apache.org/jira/browse/SPARK-23471
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Keepun
>Priority: Major
>
> RandomForestClassificationMode.load() does not work after save() 
> {code:java}
> RandomForestClassifier rf = new RandomForestClassifier()
> .setFeaturesCol("features")
> .setLabelCol("result")
> .setNumTrees(100)
> .setMaxDepth(30)
> .setMinInstancesPerNode(1)
> //.setCacheNodeIds(true)
> .setMaxMemoryInMB(500)
> .setSeed(System.currentTimeMillis() + System.nanoTime());
> RandomForestClassificationModel rfmodel = rf.train(data);
>try {
>   rfmodel.save(args[2] + "." + System.currentTimeMillis());
>} catch (IOException e) {
>   LOG.error(e.getMessage(), e);
>   e.printStackTrace();
>}
> {code}
> File metadata\part-0: 
> {code:java}
> {"class":"org.apache.spark.ml.classification.RandomForestClassificationModel",
> "timestamp":1519136783983,"sparkVersion":"2.2.1","uid":"rfc_7c7e84ce7488",
> "paramMap":{"featureSubsetStrategy":"auto","cacheNodeIds":false,"impurity":"gini",
> "checkpointInterval":10,
> "numTrees":20,"maxDepth":5,
> "probabilityCol":"probability","labelCol":"label","featuresCol":"features",
> "maxMemoryInMB":256,"minInstancesPerNode":1,"subsamplingRate":1.0,
> "rawPredictionCol":"rawPrediction","predictionCol":"prediction","maxBins":32,
> "minInfoGain":0.0,"seed":-491520797},"numFeatures":1354,"numClasses":2,
> "numTrees":20}
> {code}
> should be:
> {code:java}
> "numTrees":100,"maxDepth":30,{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, 

[jira] [Comment Edited] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374778#comment-16374778
 ] 

Stavros Kontopoulos edited comment on SPARK-23485 at 2/23/18 6:36 PM:
--

How about locality preferences + a hardware problem, like the disk problem? I 
see code in Spark Kubernetes scheduler related to locality (not sure if it is 
completed). Will that problem be detected and will kubernetes scheduler 
consider the node as problematic? If so then I guess there is no need for 
blacklisting in such scenarios. If though, this cannot be detected and the task 
is failing but there is locality preference what will happen? Kubernetes should 
not just re-run things elsewhere just because there was a failure. The reason 
for a failure matters. Is that an app failure or something lower level. 

 


was (Author: skonto):
How about locality preferences + a hardware problem, like the disk problem? I 
see code in Spark Kubernetes scheduler related to locality (not sure if it is 
completed). Will that problem be detected and will kubernetes scheduler 
consider the node as problematic? If so then I guess there is no need for 
blacklisting.

 

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Yinan Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374757#comment-16374757
 ] 

Yinan Li edited comment on SPARK-23485 at 2/23/18 6:22 PM:
---

It's not that I'm too confident on the capability of Kubernetes to detect node 
problems. I just don't see it as a good practice of worrying about node 
problems at application level in a containerized environment running on a 
container orchestration system. For that reason, yes, I don't think Spark on 
Kubernetes should really need to worry about blacklisting nodes.


was (Author: liyinan926):
It's not that I'm too confident on the capability of Kubernetes to detect node 
problems. I just don't see it as a good practice of worrying about node 
problems at application level in a containerized environment running on a 
container orchestration system. Yes, I don't think Spark on Kubernetes should 
really need to worry about blacklisting nodes.

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-23 Thread Henry Robinson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Robinson updated SPARK-23500:
---
Description: 
Simple filters on dataframes joined with {{joinWith()}} are missing an 
opportunity to get pushed into the scan because they're written in terms of 
{{named_struct}} that could be removed by the optimizer.

Given the following simple query over two dataframes:
{code:java}
scala> val df = spark.read.parquet("one_million")
df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df2 = spark.read.parquet("one_million")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
30").explain
== Physical Plan ==
*(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
:- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
:  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
struct, false].id))
   +- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
  +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
 +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
{code}

Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
is then pushed down. When the filter is just above the scan, the 
wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
removed. Then the filter can be pushed down to Parquet.


  was:
Simple filters on dataframes joined with {{joinWith()}} are missing an 
opportunity to get pushed into the scan because they're written in terms of 
{{named_struct}} that could be removed by the optimizer.

Given the following simple query over two dataframes:
{code:java}
scala> val df = spark.read.parquet("one_million")
df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df2 = spark.read.parquet("one_million")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
30").explain
== Physical Plan ==
*(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
:- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
: +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
struct, false].id))
+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
+- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
+- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
{code}

Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
is then pushed down. When the filter is just above the scan, the 
wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
removed. Then the filter can be pushed down to Parquet.



> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: 

[jira] [Assigned] (SPARK-20680) Spark-sql do not support for void column datatype of view

2018-02-23 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-20680:
--

Assignee: (was: Marcelo Vanzin)

> Spark-sql do not support for void column datatype of view
> -
>
> Key: SPARK-20680
> URL: https://issues.apache.org/jira/browse/SPARK-20680
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Lantao Jin
>Priority: Major
>
> Create a HIVE view:
> {quote}
> hive> create table bad as select 1 x, null z from dual;
> {quote}
> Because there's no type, Hive gives it the VOID type:
> {quote}
> hive> describe bad;
> OK
> x int 
> z void
> {quote}
> In Spark2.0.x, the behaviour to read this view is normal:
> {quote}
> spark-sql> describe bad;
> x   int NULL
> z   voidNULL
> Time taken: 4.431 seconds, Fetched 2 row(s)
> {quote}
> But in Spark2.1.x, it failed with SparkException: Cannot recognize hive type 
> string: void
> {quote}
> spark-sql> describe bad;
> 17/05/09 03:12:08 INFO execution.SparkSqlParser: Parsing command: describe bad
> 17/05/09 03:12:08 INFO parser.CatalystSqlParser: Parsing command: int
> 17/05/09 03:12:08 INFO parser.CatalystSqlParser: Parsing command: void
> 17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad]
> org.apache.spark.SparkException: Cannot recognize hive type string: void
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$fromHiveColumn(HiveClientImpl.scala:789)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365)
>   
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365)
>   
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:365)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:361)
> Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
> DataType void() is not supported.(line 1, pos 0)
> == SQL ==  
> void   
> ^^^
> ... 61 more
> org.apache.spark.SparkException: Cannot recognize hive type string: void
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20680) Spark-sql do not support for void column datatype of view

2018-02-23 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-20680:
--

Assignee: Marcelo Vanzin

> Spark-sql do not support for void column datatype of view
> -
>
> Key: SPARK-20680
> URL: https://issues.apache.org/jira/browse/SPARK-20680
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Lantao Jin
>Assignee: Marcelo Vanzin
>Priority: Major
>
> Create a HIVE view:
> {quote}
> hive> create table bad as select 1 x, null z from dual;
> {quote}
> Because there's no type, Hive gives it the VOID type:
> {quote}
> hive> describe bad;
> OK
> x int 
> z void
> {quote}
> In Spark2.0.x, the behaviour to read this view is normal:
> {quote}
> spark-sql> describe bad;
> x   int NULL
> z   voidNULL
> Time taken: 4.431 seconds, Fetched 2 row(s)
> {quote}
> But in Spark2.1.x, it failed with SparkException: Cannot recognize hive type 
> string: void
> {quote}
> spark-sql> describe bad;
> 17/05/09 03:12:08 INFO execution.SparkSqlParser: Parsing command: describe bad
> 17/05/09 03:12:08 INFO parser.CatalystSqlParser: Parsing command: int
> 17/05/09 03:12:08 INFO parser.CatalystSqlParser: Parsing command: void
> 17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad]
> org.apache.spark.SparkException: Cannot recognize hive type string: void
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$fromHiveColumn(HiveClientImpl.scala:789)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365)
>   
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365)
>   
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:365)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:361)
> Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
> DataType void() is not supported.(line 1, pos 0)
> == SQL ==  
> void   
> ^^^
> ... 61 more
> org.apache.spark.SparkException: Cannot recognize hive type string: void
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374778#comment-16374778
 ] 

Stavros Kontopoulos commented on SPARK-23485:
-

How about locality preferences + a hardware problem, like the disk problem? I 
see code in Spark Kubernetes scheduler related to locality (not sure if it is 
completed). Will that problem be detected and will kubernetes scheduler 
consider the node as problematic? If so then I guess there is no need for 
blacklisting.

 

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23501) Refactor AllStagesPage in order to avoid redundant code

2018-02-23 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23501:


Assignee: Apache Spark

> Refactor AllStagesPage in order to avoid redundant code
> ---
>
> Key: SPARK-23501
> URL: https://issues.apache.org/jira/browse/SPARK-23501
> Project: Spark
>  Issue Type: Task
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Assignee: Apache Spark
>Priority: Trivial
>
> AllStagesPage contains a lot of copy-pasted code for the different statuses 
> of the stages. This can be improved in order to make it easier to add new 
> stages statuses here, make changes and make less error prone these tasks.
> cc [~vanzin]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23501) Refactor AllStagesPage in order to avoid redundant code

2018-02-23 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23501:


Assignee: (was: Apache Spark)

> Refactor AllStagesPage in order to avoid redundant code
> ---
>
> Key: SPARK-23501
> URL: https://issues.apache.org/jira/browse/SPARK-23501
> Project: Spark
>  Issue Type: Task
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Priority: Trivial
>
> AllStagesPage contains a lot of copy-pasted code for the different statuses 
> of the stages. This can be improved in order to make it easier to add new 
> stages statuses here, make changes and make less error prone these tasks.
> cc [~vanzin]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23501) Refactor AllStagesPage in order to avoid redundant code

2018-02-23 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374775#comment-16374775
 ] 

Apache Spark commented on SPARK-23501:
--

User 'mgaido91' has created a pull request for this issue:
https://github.com/apache/spark/pull/20663

> Refactor AllStagesPage in order to avoid redundant code
> ---
>
> Key: SPARK-23501
> URL: https://issues.apache.org/jira/browse/SPARK-23501
> Project: Spark
>  Issue Type: Task
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Priority: Trivial
>
> AllStagesPage contains a lot of copy-pasted code for the different statuses 
> of the stages. This can be improved in order to make it easier to add new 
> stages statuses here, make changes and make less error prone these tasks.
> cc [~vanzin]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23499) Mesos Cluster Dispatcher should support priority queues to submit drivers

2018-02-23 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23499:


Assignee: (was: Apache Spark)

> Mesos Cluster Dispatcher should support priority queues to submit drivers
> -
>
> Key: SPARK-23499
> URL: https://issues.apache.org/jira/browse/SPARK-23499
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.4.0
>Reporter: Pascal GILLET
>Priority: Major
> Fix For: 2.4.0
>
>
> As for Yarn, Mesos users should be able to specify priority queues to define 
> a workload management policy for queued drivers in the Mesos Cluster 
> Dispatcher.
> Submitted drivers are *currently* kept in order of their submission: the 
> first driver added to the queue will be the first one to be executed (FIFO).
> Each driver could have a "priority" associated with it. A driver with high 
> priority is served (Mesos resources) before a driver with low priority. If 
> two drivers have the same priority, they are served according to their submit 
> date in the queue.
> To set up such priority queues, the following changes are proposed:
>  * The Mesos Cluster Dispatcher can optionally be configured with the 
> _spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
> float as value. This adds a new queue named _QueueName_ for submitted drivers 
> with the specified priority.
>  Higher numbers indicate higher priority.
>  The user can then specify multiple queues.
>  * A driver can be submitted to a specific queue with 
> _spark.mesos.dispatcher.queue_. This property takes the name of a queue 
> previously declared in the dispatcher as value.
> By default, the dispatcher has a single "default" queue with 0.0 priority 
> (cannot be overridden). If none of the properties above are specified, the 
> behavior is the same as the current one (i.e. simple FIFO).
> Additionaly, it is possible to implement a consistent and overall workload 
> management policy throughout the lifecycle of drivers by mapping these 
> priority queues to weighted Mesos roles if any (i.e. from the QUEUED state in 
> the dispatcher to the final states in the Mesos cluster), and by specifying a 
> _spark.mesos.role_ along with a _spark.mesos.dispatcher.queue_ when 
> submitting an application.
> For example, with the URGENT Mesos role:
>  # Conf on the dispatcher side
>  spark.mesos.dispatcher.queue.URGENT=1.0
>  # Conf on the driver side
>  spark.mesos.dispatcher.queue=URGENT
>  spark.mesos.role=URGENT



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23499) Mesos Cluster Dispatcher should support priority queues to submit drivers

2018-02-23 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374768#comment-16374768
 ] 

Apache Spark commented on SPARK-23499:
--

User 'pgillet' has created a pull request for this issue:
https://github.com/apache/spark/pull/20665

> Mesos Cluster Dispatcher should support priority queues to submit drivers
> -
>
> Key: SPARK-23499
> URL: https://issues.apache.org/jira/browse/SPARK-23499
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.4.0
>Reporter: Pascal GILLET
>Priority: Major
> Fix For: 2.4.0
>
>
> As for Yarn, Mesos users should be able to specify priority queues to define 
> a workload management policy for queued drivers in the Mesos Cluster 
> Dispatcher.
> Submitted drivers are *currently* kept in order of their submission: the 
> first driver added to the queue will be the first one to be executed (FIFO).
> Each driver could have a "priority" associated with it. A driver with high 
> priority is served (Mesos resources) before a driver with low priority. If 
> two drivers have the same priority, they are served according to their submit 
> date in the queue.
> To set up such priority queues, the following changes are proposed:
>  * The Mesos Cluster Dispatcher can optionally be configured with the 
> _spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
> float as value. This adds a new queue named _QueueName_ for submitted drivers 
> with the specified priority.
>  Higher numbers indicate higher priority.
>  The user can then specify multiple queues.
>  * A driver can be submitted to a specific queue with 
> _spark.mesos.dispatcher.queue_. This property takes the name of a queue 
> previously declared in the dispatcher as value.
> By default, the dispatcher has a single "default" queue with 0.0 priority 
> (cannot be overridden). If none of the properties above are specified, the 
> behavior is the same as the current one (i.e. simple FIFO).
> Additionaly, it is possible to implement a consistent and overall workload 
> management policy throughout the lifecycle of drivers by mapping these 
> priority queues to weighted Mesos roles if any (i.e. from the QUEUED state in 
> the dispatcher to the final states in the Mesos cluster), and by specifying a 
> _spark.mesos.role_ along with a _spark.mesos.dispatcher.queue_ when 
> submitting an application.
> For example, with the URGENT Mesos role:
>  # Conf on the dispatcher side
>  spark.mesos.dispatcher.queue.URGENT=1.0
>  # Conf on the driver side
>  spark.mesos.dispatcher.queue=URGENT
>  spark.mesos.role=URGENT



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23501) Refactor AllStagesPage in order to avoid redundant code

2018-02-23 Thread Marco Gaido (JIRA)
Marco Gaido created SPARK-23501:
---

 Summary: Refactor AllStagesPage in order to avoid redundant code
 Key: SPARK-23501
 URL: https://issues.apache.org/jira/browse/SPARK-23501
 Project: Spark
  Issue Type: Task
  Components: Web UI
Affects Versions: 2.4.0
Reporter: Marco Gaido


AllStagesPage contains a lot of copy-pasted code for the different statuses of 
the stages. This can be improved in order to make it easier to add new stages 
statuses here, make changes and make less error prone these tasks.

cc [~vanzin]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23499) Mesos Cluster Dispatcher should support priority queues to submit drivers

2018-02-23 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23499:


Assignee: Apache Spark

> Mesos Cluster Dispatcher should support priority queues to submit drivers
> -
>
> Key: SPARK-23499
> URL: https://issues.apache.org/jira/browse/SPARK-23499
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.4.0
>Reporter: Pascal GILLET
>Assignee: Apache Spark
>Priority: Major
> Fix For: 2.4.0
>
>
> As for Yarn, Mesos users should be able to specify priority queues to define 
> a workload management policy for queued drivers in the Mesos Cluster 
> Dispatcher.
> Submitted drivers are *currently* kept in order of their submission: the 
> first driver added to the queue will be the first one to be executed (FIFO).
> Each driver could have a "priority" associated with it. A driver with high 
> priority is served (Mesos resources) before a driver with low priority. If 
> two drivers have the same priority, they are served according to their submit 
> date in the queue.
> To set up such priority queues, the following changes are proposed:
>  * The Mesos Cluster Dispatcher can optionally be configured with the 
> _spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
> float as value. This adds a new queue named _QueueName_ for submitted drivers 
> with the specified priority.
>  Higher numbers indicate higher priority.
>  The user can then specify multiple queues.
>  * A driver can be submitted to a specific queue with 
> _spark.mesos.dispatcher.queue_. This property takes the name of a queue 
> previously declared in the dispatcher as value.
> By default, the dispatcher has a single "default" queue with 0.0 priority 
> (cannot be overridden). If none of the properties above are specified, the 
> behavior is the same as the current one (i.e. simple FIFO).
> Additionaly, it is possible to implement a consistent and overall workload 
> management policy throughout the lifecycle of drivers by mapping these 
> priority queues to weighted Mesos roles if any (i.e. from the QUEUED state in 
> the dispatcher to the final states in the Mesos cluster), and by specifying a 
> _spark.mesos.role_ along with a _spark.mesos.dispatcher.queue_ when 
> submitting an application.
> For example, with the URGENT Mesos role:
>  # Conf on the dispatcher side
>  spark.mesos.dispatcher.queue.URGENT=1.0
>  # Conf on the driver side
>  spark.mesos.dispatcher.queue=URGENT
>  spark.mesos.role=URGENT



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Yinan Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374757#comment-16374757
 ] 

Yinan Li commented on SPARK-23485:
--

It's not that I'm too confident on the capability of Kubernetes to detect node 
problems. I just don't see it as a good practice of worrying about node 
problems at application level in a containerized environment running on a 
container orchestration system. Yes, I don't think Spark on Kubernetes should 
really need to worry about blacklisting nodes.

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374748#comment-16374748
 ] 

Imran Rashid commented on SPARK-23485:
--

ok the missing jar was a bad example on kubernetes ... I still wouldn't be 
surprised if there is some app-specific failure mode we're failing to take into 
account.

I think you are too confident in kubernetes ability to detect problems with 
nodes -- I don't know what it does but I don't think it is possible for it 
handle this.  It would be great if we really could rely on the separation of 
concerns you want; in practice that just doesn't work because the app has more 
info. 
 It almost sounds like you think Spark should not even use any internal 
blacklisting with kubernetes -- from experience with large non-kubernetes 
deployments, I think that is a bad idea.

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Anirudh Ramanathan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374745#comment-16374745
 ] 

Anirudh Ramanathan edited comment on SPARK-23485 at 2/23/18 6:00 PM:
-

While mostly I think that K8s would be better suited to make the decision to 
blacklist nodes, I think we will see that there are causes to consider nodes 
problematic beyond just the kubelet health checks, so, using Spark's 
blacklisting sounds like a good idea to me. 

Tainting nodes isn't the right solution given it's one Spark application's 
notion of a blacklist and we don't want it to be applied at a cluster level. We 
could however, use [node 
anti-affinity|https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity]
 to communicate said blacklist and ensure that certain nodes are avoided by 
executors of that application.


was (Author: foxish):
While mostly I think that K8s would be better suited to make the decision to 
blacklist nodes, I think we will see that there are causes to consider nodes 
problematic beyond just the kubelet health checks, so, using Spark's 
blacklisting sounds like a good idea to me. 

Tainting nodes aren't the right solution given it's one Spark application's 
notion of a blacklist and we don't want it to be applied at a cluster level. We 
could however, use [node 
anti-affinity|https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity]
 to communicate said blacklist and ensure that certain nodes are avoided by 
executors of that application.

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Anirudh Ramanathan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374745#comment-16374745
 ] 

Anirudh Ramanathan commented on SPARK-23485:


While mostly I think that K8s would be better suited to make the decision to 
blacklist nodes, I think we will see that there are causes to consider nodes 
problematic beyond just the kubelet health checks, so, using Spark's 
blacklisting sounds like a good idea to me. 

Tainting nodes aren't the right solution given it's one Spark application's 
notion of a blacklist and we don't want it to be applied at a cluster level. We 
could however, use [node 
anti-affinity|https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity]
 to communicate said blacklist and ensure that certain nodes are avoided by 
executors of that application.

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-23 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-23500:
--

 Summary: Filters on named_structs could be pushed into scans
 Key: SPARK-23500
 URL: https://issues.apache.org/jira/browse/SPARK-23500
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Henry Robinson


Simple filters on dataframes joined with {{joinWith()}} are missing an 
opportunity to get pushed into the scan because they're written in terms of 
{{named_struct}} that could be removed by the optimizer.

Given the following simple query over two dataframes:
{code:java}
scala> val df = spark.read.parquet("one_million")
df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df2 = spark.read.parquet("one_million")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
30").explain
== Physical Plan ==
*(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
:- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
: +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
struct, false].id))
+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
+- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
+- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
{code}

Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
is then pushed down. When the filter is just above the scan, the 
wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
removed. Then the filter can be pushed down to Parquet.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23499) Mesos Cluster Dispatcher should support priority queues to submit drivers

2018-02-23 Thread Pascal GILLET (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pascal GILLET updated SPARK-23499:
--
Description: 
As for Yarn, Mesos users should be able to specify priority queues to define a 
workload management policy for queued drivers in the Mesos Cluster Dispatcher.

Submitted drivers are *currently* kept in order of their submission: the first 
driver added to the queue will be the first one to be executed (FIFO).

Each driver could have a "priority" associated with it. A driver with high 
priority is served (Mesos resources) before a driver with low priority. If two 
drivers have the same priority, they are served according to their submit date 
in the queue.

To set up such priority queues, the following changes are proposed:
 * The Mesos Cluster Dispatcher can optionally be configured with the 
_spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
float as value. This adds a new queue named _QueueName_ for submitted drivers 
with the specified priority.
 Higher numbers indicate higher priority.
 The user can then specify multiple queues.
 * A driver can be submitted to a specific queue with 
_spark.mesos.dispatcher.queue_. This property takes the name of a queue 
previously declared in the dispatcher as value.

By default, the dispatcher has a single "default" queue with 0.0 priority 
(cannot be overridden). If none of the properties above are specified, the 
behavior is the same as the current one (i.e. simple FIFO).

Additionaly, it is possible to implement a consistent and overall workload 
management policy throughout the lifecycle of drivers by mapping these priority 
queues to weighted Mesos roles if any (i.e. from the QUEUED state in the 
dispatcher to the final states in the Mesos cluster), and by specifying a 
_spark.mesos.role_ along with a _spark.mesos.dispatcher.queue_ when submitting 
an application.

For example, with the URGENT Mesos role:
 # Conf on the dispatcher side
 spark.mesos.dispatcher.queue.URGENT=1.0

 # Conf on the driver side
 spark.mesos.dispatcher.queue=URGENT
 spark.mesos.role=URGENT

  was:
As for Yarn, Mesos users should be able to specify priority queues to define a 
workload management policy for queued drivers in the Mesos Cluster Dispatcher.

Submitted drivers are *currently* kept in order of their submission: the first 
driver added to the queue will be the first one to be executed (FIFO).

Each driver could have a "priority" associated with it. A driver with high 
priority is served (Mesos resources) before a driver with low priority. If two 
drivers have the same priority, they are served according to their submit date 
in the queue.

To set up such priority queues, the following changes are proposed:
 * The Mesos Cluster Dispatcher can optionally be configured with the 
_spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
float as value. This adds a new queue named _QueueName_ for submitted drivers 
with the specified priority.
Higher numbers indicate higher priority.
The user can then specify multiple queues.
 * A driver can be submitted to a specific queue with 
_spark.mesos.dispatcher.queue_. This property takes the name of a queue 
previously declared in the dispatcher as value.

By default, the dispatcher has a single "default" queue with 0.0 priority 
(cannot be overridden). If none of the properties above are specified, the 
behavior is the same as the current one (i.e. simple FIFO).


Additionaly, it is possible to implement a consistent and overall workload 
management policy throughout the lifecycle of drivers by mapping these priority 
queues to weighted Mesos roles if any (i.e. from the QUEUED state in the 
dispatcher to the final states in the Mesos cluster), and by specifying a 
spark.mesos.role along with a spark.mesos.dispatcher.queue when submitting an 
application.


For example, with the URGENT Mesos role:
# Conf on the dispatcher side
spark.mesos.dispatcher.queue.URGENT=1.0

# Conf on the driver side
spark.mesos.dispatcher.queue=URGENT
spark.mesos.role=URGENT


> Mesos Cluster Dispatcher should support priority queues to submit drivers
> -
>
> Key: SPARK-23499
> URL: https://issues.apache.org/jira/browse/SPARK-23499
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.4.0
>Reporter: Pascal GILLET
>Priority: Major
> Fix For: 2.4.0
>
>
> As for Yarn, Mesos users should be able to specify priority queues to define 
> a workload management policy for queued drivers in the Mesos Cluster 
> Dispatcher.
> Submitted drivers are *currently* kept in order of their submission: the 
> first driver added to the queue will be the first one to be executed (FIFO).
> Each driver 

[jira] [Created] (SPARK-23499) Mesos Cluster Dispatcher should support priority queues to submit drivers

2018-02-23 Thread Pascal GILLET (JIRA)
Pascal GILLET created SPARK-23499:
-

 Summary: Mesos Cluster Dispatcher should support priority queues 
to submit drivers
 Key: SPARK-23499
 URL: https://issues.apache.org/jira/browse/SPARK-23499
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.4.0
Reporter: Pascal GILLET
 Fix For: 2.4.0


As for Yarn, Mesos users should be able to specify priority queues to define a 
workload management policy for queued drivers in the Mesos Cluster Dispatcher.

Submitted drivers are *currently* kept in order of their submission: the first 
driver added to the queue will be the first one to be executed (FIFO).

Each driver could have a "priority" associated with it. A driver with high 
priority is served (Mesos resources) before a driver with low priority. If two 
drivers have the same priority, they are served according to their submit date 
in the queue.

To set up such priority queues, the following changes are proposed:
 * The Mesos Cluster Dispatcher can optionally be configured with the 
_spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
float as value. This adds a new queue named _QueueName_ for submitted drivers 
with the specified priority.
Higher numbers indicate higher priority.
The user can then specify multiple queues.
 * A driver can be submitted to a specific queue with 
_spark.mesos.dispatcher.queue_. This property takes the name of a queue 
previously declared in the dispatcher as value.

By default, the dispatcher has a single "default" queue with 0.0 priority 
(cannot be overridden). If none of the properties above are specified, the 
behavior is the same as the current one (i.e. simple FIFO).


Additionaly, it is possible to implement a consistent and overall workload 
management policy throughout the lifecycle of drivers by mapping these priority 
queues to weighted Mesos roles if any (i.e. from the QUEUED state in the 
dispatcher to the final states in the Mesos cluster), and by specifying a 
spark.mesos.role along with a spark.mesos.dispatcher.queue when submitting an 
application.


For example, with the URGENT Mesos role:
# Conf on the dispatcher side
spark.mesos.dispatcher.queue.URGENT=1.0

# Conf on the driver side
spark.mesos.dispatcher.queue=URGENT
spark.mesos.role=URGENT



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23498) Accuracy problem in comparison with string and integer

2018-02-23 Thread Kevin Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Zhang updated SPARK-23498:

Description: 
While comparing a string column with integer value, spark sql will 
automatically cast the string operant to int, the following sql will return 
true in hive but false in spark

 
{code:java}
select '1000.1'>1000
{code}
 

 from the physical plan we can see the string operant was cast to int which 
caused the accuracy loss
{code:java}
*Project [false AS (CAST(1000.1 AS INT) > 1000)#4]

+- Scan OneRowRelation[]
{code}
To solve it, using a wider common type like double to cast both sides of 
operant of a binary operator may be safe.

  was:
While comparing a string column with integer value, spark sql will 
automatically cast the string operant to int, the following sql will return 
true in hive but false in spark

 
{code:java}
select '1000.1'>1000
{code}
 

 from the physical plan we can see the string operant was cast to int which 
caused the accuracy loss
{code:java}
*Project [false AS (CAST(1000.1 AS INT) > 1000)#4]

+- Scan OneRowRelation[]
{code}
Similar to SPARK-22469, I think it's safe to use double a common type to cast 
both side of operants to.


> Accuracy problem in comparison with string and integer
> --
>
> Key: SPARK-23498
> URL: https://issues.apache.org/jira/browse/SPARK-23498
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.2.1, 2.3.0
>Reporter: Kevin Zhang
>Priority: Major
>
> While comparing a string column with integer value, spark sql will 
> automatically cast the string operant to int, the following sql will return 
> true in hive but false in spark
>  
> {code:java}
> select '1000.1'>1000
> {code}
>  
>  from the physical plan we can see the string operant was cast to int which 
> caused the accuracy loss
> {code:java}
> *Project [false AS (CAST(1000.1 AS INT) > 1000)#4]
> +- Scan OneRowRelation[]
> {code}
> To solve it, using a wider common type like double to cast both sides of 
> operant of a binary operator may be safe.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23498) Accuracy problem in comparison with string and integer

2018-02-23 Thread Kevin Zhang (JIRA)
Kevin Zhang created SPARK-23498:
---

 Summary: Accuracy problem in comparison with string and integer
 Key: SPARK-23498
 URL: https://issues.apache.org/jira/browse/SPARK-23498
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.1, 2.2.0, 2.3.0
Reporter: Kevin Zhang


While comparing a string column with integer value, spark sql will 
automatically cast the string operant to int, the following sql will return 
true in hive but false in spark

 
{code:java}
select '1000.1'>1000
{code}
 

 from the physical plan we can see the string operant was cast to int which 
caused the accuracy loss
{code:java}
*Project [false AS (CAST(1000.1 AS INT) > 1000)#4]

+- Scan OneRowRelation[]
{code}
Similar to SPARK-22469, I think it's safe to use double a common type to cast 
both side of operants to.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Yinan Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374708#comment-16374708
 ] 

Yinan Li commented on SPARK-23485:
--

In the Yarn case, yes, it's possible that a node is missing a jar commonly 
needed by applications. In the Kubernetes mode, this will never be the case 
because containers either all have a particular jar locally or none of them has 
it. An image missing a dependency is problematic by itself. This consistency is 
one of the benefit of being containerized. Talking about node problems, 
detecting node problems and avoid scheduling pods onto problematic nodes are 
the concerns of the kubelets and the scheduler. Applications should not need to 
worry about if nodes are healthy or not. Node problems happening at runtime 
cause pods to be evicted from the problematic nodes and rescheduled somewhere 
else. Having applications be responsible for keeping track of problematic nodes 
and maintain a blacklist means unnecessarily jumping into the business of 
kubelets and the scheduler.

 

[~foxish]

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23485) Kubernetes should support node blacklist

2018-02-23 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374599#comment-16374599
 ] 

Imran Rashid commented on SPARK-23485:
--

Yeah I don't think its safe to assume that its kubernetes responsibility to 
entirely figure out the equivalent of a spark application's internal blacklist. 
 You can't guarantee that it'll detect hardware issues, and it also might be an 
issue which is specific to the spark application (eg. a missing jar). 
 Yarn has some basic detection of bad nodes as well, but we observed cases in 
production where one bad disk would effectively take out an entire application 
on a large cluster without spark's blacklisting, as you could have many task 
failures pile up very quickly.

That said, the existing blacklist implementation in spark already handles that 
case, even without the extra handling I'm proposing here.  The spark app would 
still have its own node blacklist, and would avoid scheduling tasks on that 
node.

However, this is suboptimal because spark isn't really getting as many 
resources as it should.  Eg.,  it would request 10 executors, kubernetes hands 
it 10, but really spark can only use 8 of them because 2 live on a node that is 
blacklisted.

I don't think this can be directly handled with taints, if I understand 
correctly.  I assume applying a taint is an admin level thing?  that would mean 
a spark app couldn't dynamically apply a taint when it discovers a problem on a 
node (and really, it probably shouldn't be able to, as it shouldn't trust an 
arbitrary user).  Furthermore, it doesn't allow it to be application specific 
-- blacklisting is really just a heuristic, and you probably do not want it to 
be applied across applications.  Its not clear what you'd do with multiple apps 
each with their own blacklist, as nodes go into the blacklist and then move out 
of the blacklist at different times from each app.

> Kubernetes should support node blacklist
> 
>
> Key: SPARK-23485
> URL: https://issues.apache.org/jira/browse/SPARK-23485
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Scheduler
>Affects Versions: 2.3.0
>Reporter: Imran Rashid
>Priority: Major
>
> Spark's BlacklistTracker maintains a list of "bad nodes" which it will not 
> use for running tasks (eg., because of bad hardware).  When running in yarn, 
> this blacklist is used to avoid ever allocating resources on blacklisted 
> nodes: 
> https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
> I'm just beginning to poke around the kubernetes code, so apologies if this 
> is incorrect -- but I didn't see any references to 
> {{scheduler.nodeBlacklist()}} in {{KubernetesClusterSchedulerBackend}} so it 
> seems this is missing.  Thought of this while looking at SPARK-19755, a 
> similar issue on mesos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23497) Sparklyr Applications doesn't disconnect spark driver in client mode

2018-02-23 Thread bharath kumar (JIRA)
bharath kumar created SPARK-23497:
-

 Summary: Sparklyr Applications doesn't disconnect spark driver in 
client mode
 Key: SPARK-23497
 URL: https://issues.apache.org/jira/browse/SPARK-23497
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core, YARN
Affects Versions: 2.1.0
Reporter: bharath kumar


Hello,

When we use Sparklyr to connect to Yarn cluster manager in client mode or 
cluster mode, Spark driver will not disconnect unless we mention the 
spark_disconnect(sc) in the code.

Does it make sense to add a timeout feature for driver to exit after certain 
amount of time, in client mode or cluster mode. I think its only happening with 
connection from Sparklyr to Yarn. Some times the driver stays there for weeks 
and holds minimum resources .

*More  Details:*

Yarn -2.7.0

Spark -2.1.0

Rversion:
Microsoft R Open 3.4.2
Rstudio Version:

rstudio-server-1.1.414-1.x86_64

yarn application -status application_id

18/01/22 09:08:45 INFO client.MapRZKBasedRMFailoverProxyProvider: Updated RM 
address to resourcemanager.com/resourcemanager:8032

 

Application Report : 

    Application-Id : application_id

    Application-Name : sparklyr

    Application-Type : SPARK

    User : userid

    Queue : root.queuename

    Start-Time : 1516245523965

    Finish-Time : 0

    Progress : 0%

    State : RUNNING

    Final-State : UNDEFINED

    Tracking-URL : N/A

    RPC Port : -1

    AM Host : N/A

    Aggregate Resource Allocation :266468 MB-seconds, 59 vcore-seconds

    Diagnostics : N/A

 

[http://spark.rstudio.com/]

 

I can provide more details if required

 

Thanks,

Bharath



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23496) Locality of coalesced partitions can be severely skewed by the order of input partitions

2018-02-23 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374530#comment-16374530
 ] 

Marco Gaido commented on SPARK-23496:
-

[~ala.luszczak] thanks for your answer. Honestly I don't see any value in a 
quick fix and revisiting it later. Since this won't come in Spark 2.3 and next 
release won't be out very soon, I think we have the time to design a final 
solution now. Anyway, I do agree with the objections you raised, ie. that there 
are so many factors to take in account that it is hard to define a method which 
works properly in all conditions. So, the proposed fix might be ok. But I would 
be happy to hear also opinions from other people in the community about this 
topic.

> Locality of coalesced partitions can be severely skewed by the order of input 
> partitions
> 
>
> Key: SPARK-23496
> URL: https://issues.apache.org/jira/browse/SPARK-23496
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ala Luszczak
>Priority: Major
>
> Example:
> Consider RDD "R" with 100 partitions, half of which have locality preference 
> "hostA" and half have "hostB".
>  * Assume odd-numbered input partitions of R prefer "hostA" and even-numbered 
> prefer "hostB". Then R.coalesce(50) will have 25 partitions with preference 
> "hostA" and 25 with "hostB" (even distribution).
>  * Assume partitions with index 0-49 of R prefer "hostA" and partitions with 
> index 50-99 prefer "hostB". Then R.coalesce(50) will have 49 partitions with 
> "hostA" and 1 with "hostB" (extremely skewed distribution).
>  
> The algorithm in {{DefaultPartitionCoalescer.setupGroups}} is responsible for 
> picking preferred locations for coalesced partitions. It analyzes the 
> preferred locations of input partitions. It starts by trying to create one 
> partition for each unique location in the input. However, if the the 
> requested number of coalesced partitions is higher that the number of unique 
> locations, it has to pick duplicate locations.
> Currently, the duplicate locations are picked by iterating over the input 
> partitions in order, and copying their preferred locations to coalesced 
> partitions. If the input partitions are clustered by location, this can 
> result in severe skew.
> Instead of iterating over the list of input partitions in order, we should 
> pick them at random.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23496) Locality of coalesced partitions can be severely skewed by the order of input partitions

2018-02-23 Thread Ala Luszczak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374506#comment-16374506
 ] 

Ala Luszczak commented on SPARK-23496:
--

I agree that this solution is merely making the problem unlikely to occur, 
instead of really solving it.

But the code in {{DefaultPartitionsCoalescer.setGroups()}} ([see 
comment|https://github.com/ala/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala#L234-L240])
 is deliberately written so that it's fast (O(n log n) with respect to number 
of coalesced partitions, which is assumed order of magnitude smaller than the 
number of input partitions), but not necessarily accurate. The same applies to 
other algorithms there.

Enforcing an even data distribution is not trivial. For example:
 * We merely look at the number of partitions, not on the number of rows in 
each of the partitions. There might be a severe skew across the partitions to 
begin with.
 * It's not clear how to treat partitions with multiple preferred location.
 * It's not clear if it's more important for every input location to find some 
matching coalesced partition, or if it's more important to keep the partition 
size even.
 * It's not clear how best to deal with a mix of partitions with and without 
locality preferences.

I think it's better to have a very simple fix that will work well vast majority 
of the time now, and maybe have a follow-up ticked for revisiting the design of 
{{DefaultPartitionCoalescer}} for later.

> Locality of coalesced partitions can be severely skewed by the order of input 
> partitions
> 
>
> Key: SPARK-23496
> URL: https://issues.apache.org/jira/browse/SPARK-23496
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ala Luszczak
>Priority: Major
>
> Example:
> Consider RDD "R" with 100 partitions, half of which have locality preference 
> "hostA" and half have "hostB".
>  * Assume odd-numbered input partitions of R prefer "hostA" and even-numbered 
> prefer "hostB". Then R.coalesce(50) will have 25 partitions with preference 
> "hostA" and 25 with "hostB" (even distribution).
>  * Assume partitions with index 0-49 of R prefer "hostA" and partitions with 
> index 50-99 prefer "hostB". Then R.coalesce(50) will have 49 partitions with 
> "hostA" and 1 with "hostB" (extremely skewed distribution).
>  
> The algorithm in {{DefaultPartitionCoalescer.setupGroups}} is responsible for 
> picking preferred locations for coalesced partitions. It analyzes the 
> preferred locations of input partitions. It starts by trying to create one 
> partition for each unique location in the input. However, if the the 
> requested number of coalesced partitions is higher that the number of unique 
> locations, it has to pick duplicate locations.
> Currently, the duplicate locations are picked by iterating over the input 
> partitions in order, and copying their preferred locations to coalesced 
> partitions. If the input partitions are clustered by location, this can 
> result in severe skew.
> Instead of iterating over the list of input partitions in order, we should 
> pick them at random.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23494) Expose InferSchema's functionalities to the outside

2018-02-23 Thread David Courtinot (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Courtinot updated SPARK-23494:

Description: 
I'm proposing that InferSchema's internals (infer the schema of each record, 
merge two schemata, and canonicalize the result) be exposed to the outside.

*Use-case*

My team continuously produces large amounts of JSON data. The schema is and 
must be very dynamic: fields can appear and go from one day to another, most 
fields are nullable, some fields have small frequency etc.

In another job, we download this data, sample it and infer the schema using 
Dataset.schema(). From there, we convert the data in Parquet and upload it 
somewhere for later querying. This approach has proved problematic:
 *  rare fields can be absent from a sample, and therefore absent from the 
schema. This results on exceptions when trying to query those fields. We have 
had to implement cumbersome fixes for this involving a manually curated set of 
required fields.
 * this is expensive. Going through a sample of the data to infer the schema is 
still a very costly operation for us. Caching the JSON RDD to disk (doesn't fit 
in memory) revealed to be at least as slow as traversing the sample first, and 
the whole data next.

*Proposition*

InferSchema is essentially a fold operator. This means a Spark accumulator can 
easily be built on top of it in order to calculate a schema alongside an RDD 
calculation. In the above use-case, it has two main advantages:
 * the schema is inferred on the entire data, therefore contains all possible 
fields no matter how low is their frequency.
 * the computational overhead is negligible since it happens at the same time 
as writing the data to an external store rather than by evaluating the RDD for 
the sole purpose of schema inference.
 * after writing the schema to an external store, we can load the JSON data in 
a Dataset without ever paying the inference cost again (just the conversion 
from JSON to Row). We keep the advantages and flexibility of JSON while also 
benefiting from the powerful features and optimizations available on Datasets 
or Parquet itself.

With such feature, users can decide to use their JSON (or whatever else) data 
as structured data whenever they want to even though the actual schema may vary 
every ten minutes as long as they record the schema of each portion of data.

  was:
I'm proposing that InferSchema's internals (infer the schema of each record, 
merge two schemata, and canonicalize the result) be exposed to the outside.

*Use-case*

My team continuously produces large amounts of JSON data. The schema is and 
must be very dynamic: fields can appear and go from one day to another, most 
fields are nullable, some fields have small frequency etc.

In another job, we donwload this data, sample it, infer the schema using 
Dataset.schema(). From there, we output the data in Parquet for later querying. 
This approach has proved problematic:
 *  rare fields can be absent from a sample, and therefore absent from the 
schema. This results on exceptions when trying to query those fields. We have 
had to implement cumbersome fixes for this involving a manually curated set of 
required fields.
 * this is expensive. Going through a sample of the data to infer the schema is 
still a very costly operation for us. Caching the JSON RDD to disk (doesn't fit 
in memory) revealed to be at least as slow as traversing the sample first, and 
the whole data next.

*Proposition*

InferSchema is essentially a fold operator. This means a Spark accumulator can 
easily be built on top of it in order to calculate a schema alongside an RDD 
calculation. In the above use-case, it has two main advantages:
 * the schema is inferred on the entire data, therefore contains all possible 
fields
 * the computational overhead is negligible since it happens at the same time 
as writing the data to an external store rather than by evaluating the RDD for 
the sole purpose of schema inference.
 * after writing the manifest to an external store, we can load the JSON data 
in a Dataset without ever paying the infer cost again (just the conversion from 
JSON to Row).

With such feature, users can decide to use their JSON (or whatever else) data 
as structured data whenever they want to even though the actual schema may vary 
every ten minutes as long as they record the schema of each portion of data.


> Expose InferSchema's functionalities to the outside
> ---
>
> Key: SPARK-23494
> URL: https://issues.apache.org/jira/browse/SPARK-23494
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: David Courtinot
>Priority: Major
>
> I'm proposing that InferSchema's internals (infer the schema of each 

[jira] [Updated] (SPARK-23494) Expose InferSchema's functionalities to the outside

2018-02-23 Thread David Courtinot (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Courtinot updated SPARK-23494:

Description: 
I'm proposing that InferSchema's internals (infer the schema of each record, 
merge two schemata, and canonicalize the result) be exposed to the outside.

*Use-case*

My team continuously produces large amounts of JSON data. The schema is and 
must be very dynamic: fields can appear and go from one day to another, most 
fields are nullable, some fields have small frequency etc.

In another job, we donwload this data, sample it, infer the schema using 
Dataset.schema(). From there, we output the data in Parquet for later querying. 
This approach has proved problematic:
 *  rare fields can be absent from a sample, and therefore absent from the 
schema. This results on exceptions when trying to query those fields. We have 
had to implement cumbersome fixes for this involving a manually curated set of 
required fields.
 * this is expensive. Going through a sample of the data to infer the schema is 
still a very costly operation for us. Caching the JSON RDD to disk (doesn't fit 
in memory) revealed to be at least as slow as traversing the sample first, and 
the whole data next.

*Proposition*

InferSchema is essentially a fold operator. This means a Spark accumulator can 
easily be built on top of it in order to calculate a schema alongside an RDD 
calculation. In the above use-case, it has two main advantages:
 * the schema is inferred on the entire data, therefore contains all possible 
fields
 * the computational overhead is negligible since it happens at the same time 
as writing the data to an external store rather than by evaluating the RDD for 
the sole purpose of schema inference.
 * after writing the manifest to an external store, we can load the JSON data 
in a Dataset without ever paying the infer cost again (just the conversion from 
JSON to Row).

With such feature, users can decide to use their JSON (or whatever else) data 
as structured data whenever they want to even though the actual schema may vary 
every ten minutes as long as they record the schema of each portion of data.

  was:
I'm proposing that InferSchema's internals (infer the schema of each record, 
merge two schemata, and canonicalize the result) to be exposed to the outside.

*Use-case*

My team continuously produces large amounts of JSON data. The schema is and 
must be very dynamic: fields can appear and go from one day to another, most 
fields are nullable, some fields have small frequency etc.

We then consume this data, sample it, infer the schema using Dataset.schema(). 
From there, we output the data in Parquet for later querying. This approach has 
proved problematic:
 *  rare fields can be absent from a sample, and therefore absent from the 
schema. This results on exceptions when trying to query those fields. We have 
had to implement cumbersome fixes for this involving a manually curated set of 
required fields.
 * this is expensive. Going through a sample of the data to infer the schema is 
still a very costly operation for us. Caching the JSON RDD to disk (doesn't fit 
in memory) revealed at least as slow as traversing the sample first, and the 
whole data next.

*Proposition*

InferSchema is essentially a fold operator. This means a Spark accumulator can 
easily be built on top of it in order to calculate a schema alongside an RDD 
calculation. In the above use-case, it has two main advantages:
 * the schema is inferred on the entire data, therefore contains all possible 
fields
 * the computational overhead is negligible since it happens at the same time 
as writing the data to an external store rather than by evaluating the RDD for 
the sole purpose of schema inference.
 * after writing the manifest to an external store, we can load the JSON data 
in a Dataset without ever paying the infer cost again (just the conversion from 
JSON to Row).

With such feature, users can decide to use their JSON (or whatever else) data 
as structured data whenever they want to even though the actual schema may vary 
every ten minutes as long as they record the schema of each portion of data.


> Expose InferSchema's functionalities to the outside
> ---
>
> Key: SPARK-23494
> URL: https://issues.apache.org/jira/browse/SPARK-23494
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: David Courtinot
>Priority: Major
>
> I'm proposing that InferSchema's internals (infer the schema of each record, 
> merge two schemata, and canonicalize the result) be exposed to the outside.
> *Use-case*
> My team continuously produces large amounts of JSON data. The schema is and 
> must be very dynamic: fields can appear and go from one day to 

[jira] [Commented] (SPARK-23495) Creating a json file using a dataframe Generates an issue

2018-02-23 Thread AIT OUFKIR (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374490#comment-16374490
 ] 

AIT OUFKIR commented on SPARK-23495:


After several checks, I noticed that the issue comes from the collect

> Creating a json file using a dataframe Generates an issue
> -
>
> Key: SPARK-23495
> URL: https://issues.apache.org/jira/browse/SPARK-23495
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: AIT OUFKIR
>Priority: Major
> Fix For: 2.1.0
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> Issue happen when trying to create json file using a dataframe (see code 
> below)
> from pyspark.sql import SQLContext
>  a = ["a1","a2"]
>  b = ["b1","b2","b3"]
>  c = ["c1","c2","c3", "c4"]
>  d = \{'d1':1, 'd2':2}
>  e = \{'e1':1, 'e2':2, 'e3':3}
>  f = ['f1','f2','f3']
>  g = ['g1','g2','g3','g4']
> metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d, fasi=f, 
> gasi=g{color:#ff}, easi=e{color})
>  md = sqlContext.createDataFrame([metadata_dump]).collect()
>  metadata = sqlContext.createDataFrame(md,['asi', 'basi', 
> 'casi','dasi','fasi', 'gasi', 'easi'])
> metadata_path = "/folder/fileNameErr"
>  metadata.write.mode('overwrite').json(metadata_path)
> {"{color:#14892c}asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":\{"d1":1,"d2":2{color}},"fasi":\{"e1":1,"e2":2,"e3":3},"gasi":["f1","f2","f3"],"easi":["g1","g2","g3","g4{color}"]}
>  
> when switching the dictionary e
>  
> metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d{color:#ff}*, 
> easi=e*{color}, fasi=f, gasi=g)
>  md = sqlContext.createDataFrame([metadata_dump]).collect()
>  metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi', 
> {color:#ff}*'easi',*{color}'fasi', 'gasi'])
>  metadata_path = "/folder/fileNameCorr"
>  metadata.write.mode('overwrite').json(metadata_path)
> {color:#14892c}{"asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":\\{"d1":1,"d2":2},"easi":\{"e1":1,"e2":2,"e3":3},"fasi":["f1","f2","f3"],"gasi":["g1","g2","g3","g4"]}{color}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23494) Expose InferSchema's functionalities to the outside

2018-02-23 Thread David Courtinot (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Courtinot updated SPARK-23494:

Description: 
I'm proposing that InferSchema's internals (infer the schema of each record, 
merge two schemata, and canonicalize the result) to be exposed to the outside.

*Use-case*

My team continuously produces large amounts of JSON data. The schema is and 
must be very dynamic: fields can appear and go from one day to another, most 
fields are nullable, some fields have small frequency etc.

We then consume this data, sample it, infer the schema using Dataset.schema(). 
From there, we output the data in Parquet for later querying. This approach has 
proved problematic:
 *  rare fields can be absent from a sample, and therefore absent from the 
schema. This results on exceptions when trying to query those fields. We have 
had to implement cumbersome fixes for this involving a manually curated set of 
required fields.
 * this is expensive. Going through a sample of the data to infer the schema is 
still a very costly operation for us. Caching the JSON RDD to disk (doesn't fit 
in memory) revealed at least as slow as traversing the sample first, and the 
whole data next.

*Proposition*

InferSchema is essentially a fold operator. This means a Spark accumulator can 
easily be built on top of it in order to calculate a schema alongside an RDD 
calculation. In the above use-case, it has two main advantages:
 * the schema is inferred on the entire data, therefore contains all possible 
fields
 * the computational overhead is negligible since it happens at the same time 
as writing the data to an external store rather than by evaluating the RDD for 
the sole purpose of schema inference.
 * after writing the manifest to an external store, we can load the JSON data 
in a Dataset without ever paying the infer cost again (just the conversion from 
JSON to Row).

With such feature, users can decide to use their JSON (or whatever else) data 
as structured data whenever they want to even though the actual schema may vary 
every ten minutes as long as they record the schema of each portion of data.

  was:
I'm proposing that InferSchema's internals (infer the schema of each record, 
merge two schemata, and canonicalize the result) to be exposed to the outside.

*Use-case*

We continuously produce large amounts of JSON data. The schema is and must be 
very dynamic: fields can appear and go from one day to another, most fields are 
nullable, some fields have small frequency etc.

We then consume this data, sample it, infer the schema using Dataset.schema(). 
From there, we output the data in Parquet for later querying. This approach has 
proved problematic:
 *  rare fields can be absent from a sample, and therefore absent from the 
schema. This results on exceptions when trying to query those fields. We have 
had to implement cumbersome fixes for this involving a manually curated set of 
required fields.
 * this is expensive. Going through a sample of the data to infer the schema is 
still a very costly operation for us. Caching the JSON RDD to disk (doesn't fit 
in memory) revealed at least as slow as traversing the sample first, and the 
whole data next.

*Proposition*

InferSchema is essentially a fold operator. This means a Spark accumulator can 
easily be built on top of it in order to calculate a schema alongside an RDD 
calculation. In the above use-case, it has two main advantages:
 * the schema is inferred on the entire data, therefore contains all possible 
fields
 * the computational overhead is negligible since it happens at the same time 
as writing the data to an external store rather than by evaluating the RDD for 
the sole purpose of schema inference.
 * after writing the manifest to an external store, we can load the JSON data 
in a Dataset without ever paying the infer cost again (just the conversion from 
JSON to Row).

With such feature, users can decide to use their JSON (or whatever else) data 
as structured data whenever they want to even though the actual schema may vary 
every ten minutes as long as they record the schema of each portion of data.


> Expose InferSchema's functionalities to the outside
> ---
>
> Key: SPARK-23494
> URL: https://issues.apache.org/jira/browse/SPARK-23494
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: David Courtinot
>Priority: Major
>
> I'm proposing that InferSchema's internals (infer the schema of each record, 
> merge two schemata, and canonicalize the result) to be exposed to the outside.
> *Use-case*
> My team continuously produces large amounts of JSON data. The schema is and 
> must be very dynamic: fields can appear and go from one day to another, most 
> 

[jira] [Commented] (SPARK-23496) Locality of coalesced partitions can be severely skewed by the order of input partitions

2018-02-23 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374439#comment-16374439
 ] 

Marco Gaido commented on SPARK-23496:
-

I read that the proposed solution is to use random numbers instead of 
iterating. This seems to me not as a solution but as a workaround, which 
doesn't solve the problem, but it makes it unlikely.

What about a solutions which does solve the problem, ie. we enforce an even 
distribution according to the incoming data distribution? I mean, what about 
creating a sort of reversed index with the preferred location as key and the 
partition as values and picking from each value list a ratio corresponding to 
the coalescing ratio?

> Locality of coalesced partitions can be severely skewed by the order of input 
> partitions
> 
>
> Key: SPARK-23496
> URL: https://issues.apache.org/jira/browse/SPARK-23496
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ala Luszczak
>Priority: Major
>
> Example:
> Consider RDD "R" with 100 partitions, half of which have locality preference 
> "hostA" and half have "hostB".
>  * Assume odd-numbered input partitions of R prefer "hostA" and even-numbered 
> prefer "hostB". Then R.coalesce(50) will have 25 partitions with preference 
> "hostA" and 25 with "hostB" (even distribution).
>  * Assume partitions with index 0-49 of R prefer "hostA" and partitions with 
> index 50-99 prefer "hostB". Then R.coalesce(50) will have 49 partitions with 
> "hostA" and 1 with "hostB" (extremely skewed distribution).
>  
> The algorithm in {{DefaultPartitionCoalescer.setupGroups}} is responsible for 
> picking preferred locations for coalesced partitions. It analyzes the 
> preferred locations of input partitions. It starts by trying to create one 
> partition for each unique location in the input. However, if the the 
> requested number of coalesced partitions is higher that the number of unique 
> locations, it has to pick duplicate locations.
> Currently, the duplicate locations are picked by iterating over the input 
> partitions in order, and copying their preferred locations to coalesced 
> partitions. If the input partitions are clustered by location, this can 
> result in severe skew.
> Instead of iterating over the list of input partitions in order, we should 
> pick them at random.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23496) Locality of coalesced partitions can be severely skewed by the order of input partitions

2018-02-23 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23496:


Assignee: (was: Apache Spark)

> Locality of coalesced partitions can be severely skewed by the order of input 
> partitions
> 
>
> Key: SPARK-23496
> URL: https://issues.apache.org/jira/browse/SPARK-23496
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ala Luszczak
>Priority: Major
>
> Example:
> Consider RDD "R" with 100 partitions, half of which have locality preference 
> "hostA" and half have "hostB".
>  * Assume odd-numbered input partitions of R prefer "hostA" and even-numbered 
> prefer "hostB". Then R.coalesce(50) will have 25 partitions with preference 
> "hostA" and 25 with "hostB" (even distribution).
>  * Assume partitions with index 0-49 of R prefer "hostA" and partitions with 
> index 50-99 prefer "hostB". Then R.coalesce(50) will have 49 partitions with 
> "hostA" and 1 with "hostB" (extremely skewed distribution).
>  
> The algorithm in {{DefaultPartitionCoalescer.setupGroups}} is responsible for 
> picking preferred locations for coalesced partitions. It analyzes the 
> preferred locations of input partitions. It starts by trying to create one 
> partition for each unique location in the input. However, if the the 
> requested number of coalesced partitions is higher that the number of unique 
> locations, it has to pick duplicate locations.
> Currently, the duplicate locations are picked by iterating over the input 
> partitions in order, and copying their preferred locations to coalesced 
> partitions. If the input partitions are clustered by location, this can 
> result in severe skew.
> Instead of iterating over the list of input partitions in order, we should 
> pick them at random.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23496) Locality of coalesced partitions can be severely skewed by the order of input partitions

2018-02-23 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374429#comment-16374429
 ] 

Apache Spark commented on SPARK-23496:
--

User 'ala' has created a pull request for this issue:
https://github.com/apache/spark/pull/20664

> Locality of coalesced partitions can be severely skewed by the order of input 
> partitions
> 
>
> Key: SPARK-23496
> URL: https://issues.apache.org/jira/browse/SPARK-23496
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ala Luszczak
>Priority: Major
>
> Example:
> Consider RDD "R" with 100 partitions, half of which have locality preference 
> "hostA" and half have "hostB".
>  * Assume odd-numbered input partitions of R prefer "hostA" and even-numbered 
> prefer "hostB". Then R.coalesce(50) will have 25 partitions with preference 
> "hostA" and 25 with "hostB" (even distribution).
>  * Assume partitions with index 0-49 of R prefer "hostA" and partitions with 
> index 50-99 prefer "hostB". Then R.coalesce(50) will have 49 partitions with 
> "hostA" and 1 with "hostB" (extremely skewed distribution).
>  
> The algorithm in {{DefaultPartitionCoalescer.setupGroups}} is responsible for 
> picking preferred locations for coalesced partitions. It analyzes the 
> preferred locations of input partitions. It starts by trying to create one 
> partition for each unique location in the input. However, if the the 
> requested number of coalesced partitions is higher that the number of unique 
> locations, it has to pick duplicate locations.
> Currently, the duplicate locations are picked by iterating over the input 
> partitions in order, and copying their preferred locations to coalesced 
> partitions. If the input partitions are clustered by location, this can 
> result in severe skew.
> Instead of iterating over the list of input partitions in order, we should 
> pick them at random.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23496) Locality of coalesced partitions can be severely skewed by the order of input partitions

2018-02-23 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23496:


Assignee: Apache Spark

> Locality of coalesced partitions can be severely skewed by the order of input 
> partitions
> 
>
> Key: SPARK-23496
> URL: https://issues.apache.org/jira/browse/SPARK-23496
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ala Luszczak
>Assignee: Apache Spark
>Priority: Major
>
> Example:
> Consider RDD "R" with 100 partitions, half of which have locality preference 
> "hostA" and half have "hostB".
>  * Assume odd-numbered input partitions of R prefer "hostA" and even-numbered 
> prefer "hostB". Then R.coalesce(50) will have 25 partitions with preference 
> "hostA" and 25 with "hostB" (even distribution).
>  * Assume partitions with index 0-49 of R prefer "hostA" and partitions with 
> index 50-99 prefer "hostB". Then R.coalesce(50) will have 49 partitions with 
> "hostA" and 1 with "hostB" (extremely skewed distribution).
>  
> The algorithm in {{DefaultPartitionCoalescer.setupGroups}} is responsible for 
> picking preferred locations for coalesced partitions. It analyzes the 
> preferred locations of input partitions. It starts by trying to create one 
> partition for each unique location in the input. However, if the the 
> requested number of coalesced partitions is higher that the number of unique 
> locations, it has to pick duplicate locations.
> Currently, the duplicate locations are picked by iterating over the input 
> partitions in order, and copying their preferred locations to coalesced 
> partitions. If the input partitions are clustered by location, this can 
> result in severe skew.
> Instead of iterating over the list of input partitions in order, we should 
> pick them at random.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23495) Creating a json file using a dataframe Generates an issue

2018-02-23 Thread AIT OUFKIR (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

AIT OUFKIR updated SPARK-23495:
---
Description: 
Issue happen when trying to create json file using a dataframe (see code below)

from pyspark.sql import SQLContext
 a = ["a1","a2"]
 b = ["b1","b2","b3"]
 c = ["c1","c2","c3", "c4"]
 d = \{'d1':1, 'd2':2}
 e = \{'e1':1, 'e2':2, 'e3':3}
 f = ['f1','f2','f3']
 g = ['g1','g2','g3','g4']

metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d, fasi=f, 
gasi=g{color:#ff}, easi=e{color})
 md = sqlContext.createDataFrame([metadata_dump]).collect()
 metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi','fasi', 
'gasi', 'easi'])

metadata_path = "/folder/fileNameErr"
 metadata.write.mode('overwrite').json(metadata_path)

{"{color:#14892c}asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":\{"d1":1,"d2":2{color}},"fasi":\{"e1":1,"e2":2,"e3":3},"gasi":["f1","f2","f3"],"easi":["g1","g2","g3","g4{color}"]}

 

when switching the dictionary e

 

metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d{color:#ff}*, 
easi=e*{color}, fasi=f, gasi=g)
 md = sqlContext.createDataFrame([metadata_dump]).collect()
 metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi', 
{color:#ff}*'easi',*{color}'fasi', 'gasi'])
 metadata_path = "/folder/fileNameCorr"
 metadata.write.mode('overwrite').json(metadata_path)

{color:#14892c}{"asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":\\{"d1":1,"d2":2},"easi":\{"e1":1,"e2":2,"e3":3},"fasi":["f1","f2","f3"],"gasi":["g1","g2","g3","g4"]}{color}

 

 

 

 

  was:
Issue happen when trying to create json file using a dataframe (see code below)

from pyspark.sql import SQLContext
a = ["a1","a2"]
b = ["b1","b2","b3"]
c = ["c1","c2","c3", "c4"]
d = \{'d1':1, 'd2':2}
e = \{'e1':1, 'e2':2, 'e3':3}
f = ['f1','f2','f3']
g = ['g1','g2','g3','g4']

metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d, fasi=f, 
gasi=g*{color:#FF}, easi=e{color}*)
md = sqlContext.createDataFrame([metadata_dump]).collect()
metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi','fasi', 
'gasi', 'easi'])

metadata_path = "/folder/fileNameErr"
metadata.write.mode('overwrite').json(metadata_path)

{"{color:#14892c}asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":{"d1":1,"d2":2{color}},{color:#FF}"fasi":\{"e1":1,"e2":2,"e3":3},"gasi":["f1","f2","f3"],"easi":["g1","g2","g3","g4{color}"]}

 

when switching the dictionary e

 

metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d{color:#FF}*, 
easi=e*{color}, fasi=f, gasi=g)
md = sqlContext.createDataFrame([metadata_dump]).collect()
metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi', 
{color:#FF}*'easi',*{color}'fasi', 'gasi'])
metadata_path = "/folder/fileNameCorr"
metadata.write.mode('overwrite').json(metadata_path)

{color:#14892c}{"asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":\{"d1":1,"d2":2},"easi":\{"e1":1,"e2":2,"e3":3},"fasi":["f1","f2","f3"],"gasi":["g1","g2","g3","g4"]}{color}

 

 

 

 


> Creating a json file using a dataframe Generates an issue
> -
>
> Key: SPARK-23495
> URL: https://issues.apache.org/jira/browse/SPARK-23495
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: AIT OUFKIR
>Priority: Major
> Fix For: 2.1.0
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> Issue happen when trying to create json file using a dataframe (see code 
> below)
> from pyspark.sql import SQLContext
>  a = ["a1","a2"]
>  b = ["b1","b2","b3"]
>  c = ["c1","c2","c3", "c4"]
>  d = \{'d1':1, 'd2':2}
>  e = \{'e1':1, 'e2':2, 'e3':3}
>  f = ['f1','f2','f3']
>  g = ['g1','g2','g3','g4']
> metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d, fasi=f, 
> gasi=g{color:#ff}, easi=e{color})
>  md = sqlContext.createDataFrame([metadata_dump]).collect()
>  metadata = sqlContext.createDataFrame(md,['asi', 'basi', 
> 'casi','dasi','fasi', 'gasi', 'easi'])
> metadata_path = "/folder/fileNameErr"
>  metadata.write.mode('overwrite').json(metadata_path)
> {"{color:#14892c}asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":\{"d1":1,"d2":2{color}},"fasi":\{"e1":1,"e2":2,"e3":3},"gasi":["f1","f2","f3"],"easi":["g1","g2","g3","g4{color}"]}
>  
> when switching the dictionary e
>  
> metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d{color:#ff}*, 
> easi=e*{color}, fasi=f, gasi=g)
>  md = sqlContext.createDataFrame([metadata_dump]).collect()
>  metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi', 
> {color:#ff}*'easi',*{color}'fasi', 'gasi'])
>  metadata_path = "/folder/fileNameCorr"
>  

[jira] [Created] (SPARK-23496) Locality of coalesced partitions can be severely skewed by the order of input partitions

2018-02-23 Thread Ala Luszczak (JIRA)
Ala Luszczak created SPARK-23496:


 Summary: Locality of coalesced partitions can be severely skewed 
by the order of input partitions
 Key: SPARK-23496
 URL: https://issues.apache.org/jira/browse/SPARK-23496
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Ala Luszczak


Example:

Consider RDD "R" with 100 partitions, half of which have locality preference 
"hostA" and half have "hostB".
 * Assume odd-numbered input partitions of R prefer "hostA" and even-numbered 
prefer "hostB". Then R.coalesce(50) will have 25 partitions with preference 
"hostA" and 25 with "hostB" (even distribution).
 * Assume partitions with index 0-49 of R prefer "hostA" and partitions with 
index 50-99 prefer "hostB". Then R.coalesce(50) will have 49 partitions with 
"hostA" and 1 with "hostB" (extremely skewed distribution).

 

The algorithm in {{DefaultPartitionCoalescer.setupGroups}} is responsible for 
picking preferred locations for coalesced partitions. It analyzes the preferred 
locations of input partitions. It starts by trying to create one partition for 
each unique location in the input. However, if the the requested number of 
coalesced partitions is higher that the number of unique locations, it has to 
pick duplicate locations.

Currently, the duplicate locations are picked by iterating over the input 
partitions in order, and copying their preferred locations to coalesced 
partitions. If the input partitions are clustered by location, this can result 
in severe skew.

Instead of iterating over the list of input partitions in order, we should pick 
them at random.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23495) Creating a json file using a dataframe Generates an issue

2018-02-23 Thread AIT OUFKIR (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

AIT OUFKIR updated SPARK-23495:
---
 Flags: Important
Remaining Estimate: 4h
 Original Estimate: 4h

This issue can create  Major inconsistencies in data

> Creating a json file using a dataframe Generates an issue
> -
>
> Key: SPARK-23495
> URL: https://issues.apache.org/jira/browse/SPARK-23495
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: AIT OUFKIR
>Priority: Major
> Fix For: 2.1.0
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> Issue happen when trying to create json file using a dataframe (see code 
> below)
> from pyspark.sql import SQLContext
> a = ["a1","a2"]
> b = ["b1","b2","b3"]
> c = ["c1","c2","c3", "c4"]
> d = \{'d1':1, 'd2':2}
> e = \{'e1':1, 'e2':2, 'e3':3}
> f = ['f1','f2','f3']
> g = ['g1','g2','g3','g4']
> metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d, fasi=f, 
> gasi=g*{color:#FF}, easi=e{color}*)
> md = sqlContext.createDataFrame([metadata_dump]).collect()
> metadata = sqlContext.createDataFrame(md,['asi', 'basi', 
> 'casi','dasi','fasi', 'gasi', 'easi'])
> metadata_path = "/folder/fileNameErr"
> metadata.write.mode('overwrite').json(metadata_path)
> {"{color:#14892c}asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":{"d1":1,"d2":2{color}},{color:#FF}"fasi":\{"e1":1,"e2":2,"e3":3},"gasi":["f1","f2","f3"],"easi":["g1","g2","g3","g4{color}"]}
>  
> when switching the dictionary e
>  
> metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d{color:#FF}*, 
> easi=e*{color}, fasi=f, gasi=g)
> md = sqlContext.createDataFrame([metadata_dump]).collect()
> metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi', 
> {color:#FF}*'easi',*{color}'fasi', 'gasi'])
> metadata_path = "/folder/fileNameCorr"
> metadata.write.mode('overwrite').json(metadata_path)
> {color:#14892c}{"asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":\{"d1":1,"d2":2},"easi":\{"e1":1,"e2":2,"e3":3},"fasi":["f1","f2","f3"],"gasi":["g1","g2","g3","g4"]}{color}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23495) Creating a json file using a dataframe Generates an issue

2018-02-23 Thread AIT OUFKIR (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

AIT OUFKIR updated SPARK-23495:
---
Description: 
Issue happen when trying to create json file using a dataframe (see code below)

from pyspark.sql import SQLContext
a = ["a1","a2"]
b = ["b1","b2","b3"]
c = ["c1","c2","c3", "c4"]
d = \{'d1':1, 'd2':2}
e = \{'e1':1, 'e2':2, 'e3':3}
f = ['f1','f2','f3']
g = ['g1','g2','g3','g4']

metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d, fasi=f, 
gasi=g*{color:#FF}, easi=e{color}*)
md = sqlContext.createDataFrame([metadata_dump]).collect()
metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi','fasi', 
'gasi', 'easi'])

metadata_path = "/folder/fileNameErr"
metadata.write.mode('overwrite').json(metadata_path)

{"{color:#14892c}asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":{"d1":1,"d2":2{color}},{color:#FF}"fasi":\{"e1":1,"e2":2,"e3":3},"gasi":["f1","f2","f3"],"easi":["g1","g2","g3","g4{color}"]}

 

when switching the dictionary e

 

metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d{color:#FF}*, 
easi=e*{color}, fasi=f, gasi=g)
md = sqlContext.createDataFrame([metadata_dump]).collect()
metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi', 
{color:#FF}*'easi',*{color}'fasi', 'gasi'])
metadata_path = "/folder/fileNameCorr"
metadata.write.mode('overwrite').json(metadata_path)

{color:#14892c}{"asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":\{"d1":1,"d2":2},"easi":\{"e1":1,"e2":2,"e3":3},"fasi":["f1","f2","f3"],"gasi":["g1","g2","g3","g4"]}{color}

 

 

 

 

  was:
Issue happen when trying to create json file using a dataframe (see code below)

catis = ["CAT1","CAT2"]
constis = ["CONST1","CONST2","CONST3"]
datis = ["DAT1","DATE2","DATE3"]
dictis = \{'A':1, 'B':2}
dummis = ['dum1','dumm2','dumm3']
fifis = \{'fifi1':1, 'fifi2':2, 'fifi3':3}
khikhis = ['khikhi1','khikhi12','khikhi3','khikhi4']

metadata_dump = dict(cati=catis, consti=constis, dati=datis, dicti=dictis, 
khikhi=khikhis, dummi=dummis, fifi=fifis)
md = sqlContext.createDataFrame([metadata_dump]).collect()
metadata = sqlContext.createDataFrame(md,['cati', 'consti', 'dati', 
'dicti','khikhi', 'dummi', 'fifi'])

metadata_path = "/mypath"
metadata.write.mode('overwrite').json(metadata_path)

This gives the following Results :

{"cati":["CAT1","CAT2"]
,"consti":["CONST1","CONST2","CONST3"]
,"dati":["DAT1","DATE2","DATE3"]
,"dicti":\{"A":1,"B":2}
,"khikhi":["dum1","dumm2","dumm3"]
,"dummi":\{"fifi2":2,"fifi3":3,"fifi1":1}
,"fifi":["khikhi1","khikhi12","khikhi3","khikhi4"]}

Which is wrong

 

When I try switching the fifis dict and not putting it at the end of the dict 
metadata_dump then I get the correct results :

 {
"cati":["CAT1","CAT2"]
,"consti":["CONST1","CONST2","CONST3"]
,"dati":["DAT1","DATE2","DATE3"]
,"dicti":\{"A":1,"B":2}
,"dummi":["dum1","dumm2","dumm3"]
,"fifi":\{"fifi2":2,"fifi3":3,"fifi1":1}
,"khikhi":["khikhi1","khikhi12","khikhi3","khikhi4"]
}

 


> Creating a json file using a dataframe Generates an issue
> -
>
> Key: SPARK-23495
> URL: https://issues.apache.org/jira/browse/SPARK-23495
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: AIT OUFKIR
>Priority: Major
> Fix For: 2.1.0
>
>
> Issue happen when trying to create json file using a dataframe (see code 
> below)
> from pyspark.sql import SQLContext
> a = ["a1","a2"]
> b = ["b1","b2","b3"]
> c = ["c1","c2","c3", "c4"]
> d = \{'d1':1, 'd2':2}
> e = \{'e1':1, 'e2':2, 'e3':3}
> f = ['f1','f2','f3']
> g = ['g1','g2','g3','g4']
> metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d, fasi=f, 
> gasi=g*{color:#FF}, easi=e{color}*)
> md = sqlContext.createDataFrame([metadata_dump]).collect()
> metadata = sqlContext.createDataFrame(md,['asi', 'basi', 
> 'casi','dasi','fasi', 'gasi', 'easi'])
> metadata_path = "/folder/fileNameErr"
> metadata.write.mode('overwrite').json(metadata_path)
> {"{color:#14892c}asi":["a1","a2"],"basi":["b1","b2","b3"],"casi":["c1","c2","c3","c4"],"dasi":{"d1":1,"d2":2{color}},{color:#FF}"fasi":\{"e1":1,"e2":2,"e3":3},"gasi":["f1","f2","f3"],"easi":["g1","g2","g3","g4{color}"]}
>  
> when switching the dictionary e
>  
> metadata_dump = dict(asi=a, basi=b, casi = c, dasi=d{color:#FF}*, 
> easi=e*{color}, fasi=f, gasi=g)
> md = sqlContext.createDataFrame([metadata_dump]).collect()
> metadata = sqlContext.createDataFrame(md,['asi', 'basi', 'casi','dasi', 
> {color:#FF}*'easi',*{color}'fasi', 'gasi'])
> metadata_path = "/folder/fileNameCorr"
> metadata.write.mode('overwrite').json(metadata_path)
> 

[jira] [Created] (SPARK-23495) Creating a json file using a dataframe creates an issue

2018-02-23 Thread AIT OUFKIR (JIRA)
AIT OUFKIR created SPARK-23495:
--

 Summary: Creating a json file using a dataframe creates an issue
 Key: SPARK-23495
 URL: https://issues.apache.org/jira/browse/SPARK-23495
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.1.0
Reporter: AIT OUFKIR
 Fix For: 2.1.0


Issue happen when trying to create json file using a dataframe (see code below)

catis = ["CAT1","CAT2"]
constis = ["CONST1","CONST2","CONST3"]
datis = ["DAT1","DATE2","DATE3"]
dictis = \{'A':1, 'B':2}
dummis = ['dum1','dumm2','dumm3']
fifis = \{'fifi1':1, 'fifi2':2, 'fifi3':3}
khikhis = ['khikhi1','khikhi12','khikhi3','khikhi4']

metadata_dump = dict(cati=catis, consti=constis, dati=datis, dicti=dictis, 
khikhi=khikhis, dummi=dummis, fifi=fifis)
md = sqlContext.createDataFrame([metadata_dump]).collect()
metadata = sqlContext.createDataFrame(md,['cati', 'consti', 'dati', 
'dicti','khikhi', 'dummi', 'fifi'])

metadata_path = "/mypath"
metadata.write.mode('overwrite').json(metadata_path)

This gives the following Results :

{"cati":["CAT1","CAT2"]
,"consti":["CONST1","CONST2","CONST3"]
,"dati":["DAT1","DATE2","DATE3"]
,"dicti":\{"A":1,"B":2}
,"khikhi":["dum1","dumm2","dumm3"]
,"dummi":\{"fifi2":2,"fifi3":3,"fifi1":1}
,"fifi":["khikhi1","khikhi12","khikhi3","khikhi4"]}

Which is wrong

 

When I try switching the fifis dict and not putting it at the end of the dict 
metadata_dump then I get the correct results :

 {
"cati":["CAT1","CAT2"]
,"consti":["CONST1","CONST2","CONST3"]
,"dati":["DAT1","DATE2","DATE3"]
,"dicti":\{"A":1,"B":2}
,"dummi":["dum1","dumm2","dumm3"]
,"fifi":\{"fifi2":2,"fifi3":3,"fifi1":1}
,"khikhi":["khikhi1","khikhi12","khikhi3","khikhi4"]
}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23495) Creating a json file using a dataframe Generates an issue

2018-02-23 Thread AIT OUFKIR (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

AIT OUFKIR updated SPARK-23495:
---
Summary: Creating a json file using a dataframe Generates an issue  (was: 
Creating a json file using a dataframe creates an issue)

> Creating a json file using a dataframe Generates an issue
> -
>
> Key: SPARK-23495
> URL: https://issues.apache.org/jira/browse/SPARK-23495
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: AIT OUFKIR
>Priority: Major
> Fix For: 2.1.0
>
>
> Issue happen when trying to create json file using a dataframe (see code 
> below)
> catis = ["CAT1","CAT2"]
> constis = ["CONST1","CONST2","CONST3"]
> datis = ["DAT1","DATE2","DATE3"]
> dictis = \{'A':1, 'B':2}
> dummis = ['dum1','dumm2','dumm3']
> fifis = \{'fifi1':1, 'fifi2':2, 'fifi3':3}
> khikhis = ['khikhi1','khikhi12','khikhi3','khikhi4']
> metadata_dump = dict(cati=catis, consti=constis, dati=datis, dicti=dictis, 
> khikhi=khikhis, dummi=dummis, fifi=fifis)
> md = sqlContext.createDataFrame([metadata_dump]).collect()
> metadata = sqlContext.createDataFrame(md,['cati', 'consti', 'dati', 
> 'dicti','khikhi', 'dummi', 'fifi'])
> metadata_path = "/mypath"
> metadata.write.mode('overwrite').json(metadata_path)
> This gives the following Results :
> {"cati":["CAT1","CAT2"]
> ,"consti":["CONST1","CONST2","CONST3"]
> ,"dati":["DAT1","DATE2","DATE3"]
> ,"dicti":\{"A":1,"B":2}
> ,"khikhi":["dum1","dumm2","dumm3"]
> ,"dummi":\{"fifi2":2,"fifi3":3,"fifi1":1}
> ,"fifi":["khikhi1","khikhi12","khikhi3","khikhi4"]}
> Which is wrong
>  
> When I try switching the fifis dict and not putting it at the end of the dict 
> metadata_dump then I get the correct results :
>  {
> "cati":["CAT1","CAT2"]
> ,"consti":["CONST1","CONST2","CONST3"]
> ,"dati":["DAT1","DATE2","DATE3"]
> ,"dicti":\{"A":1,"B":2}
> ,"dummi":["dum1","dumm2","dumm3"]
> ,"fifi":\{"fifi2":2,"fifi3":3,"fifi1":1}
> ,"khikhi":["khikhi1","khikhi12","khikhi3","khikhi4"]
> }
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23494) Expose InferSchema's functionalities to the outside

2018-02-23 Thread David Courtinot (JIRA)
David Courtinot created SPARK-23494:
---

 Summary: Expose InferSchema's functionalities to the outside
 Key: SPARK-23494
 URL: https://issues.apache.org/jira/browse/SPARK-23494
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core, SQL
Affects Versions: 2.2.1
Reporter: David Courtinot


I'm proposing that InferSchema's internals (infer the schema of each record, 
merge two schemata, and canonicalize the result) to be exposed to the outside.

*Use-case*

We continuously produce large amounts of JSON data. The schema is and must be 
very dynamic: fields can appear and go from one day to another, most fields are 
nullable, some fields have small frequency etc.

We then consume this data, sample it, infer the schema using Dataset.schema(). 
From there, we output the data in Parquet for later querying. This approach has 
proved problematic:
 *  rare fields can be absent from a sample, and therefore absent from the 
schema. This results on exceptions when trying to query those fields. We have 
had to implement cumbersome fixes for this involving a manually curated set of 
required fields.
 * this is expensive. Going through a sample of the data to infer the schema is 
still a very costly operation for us. Caching the JSON RDD to disk (doesn't fit 
in memory) revealed at least as slow as traversing the sample first, and the 
whole data next.

*Proposition*

InferSchema is essentially a fold operator. This means a Spark accumulator can 
easily be built on top of it in order to calculate a schema alongside an RDD 
calculation. In the above use-case, it has two main advantages:
 * the schema is inferred on the entire data, therefore contains all possible 
fields
 * the computational overhead is negligible since it happens at the same time 
as writing the data to an external store rather than by evaluating the RDD for 
the sole purpose of schema inference.
 * after writing the manifest to an external store, we can load the JSON data 
in a Dataset without ever paying the infer cost again (just the conversion from 
JSON to Row).

With such feature, users can decide to use their JSON (or whatever else) data 
as structured data whenever they want to even though the actual schema may vary 
every ten minutes as long as they record the schema of each portion of data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23475) The "stages" page doesn't show any completed stages

2018-02-23 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374360#comment-16374360
 ] 

Apache Spark commented on SPARK-23475:
--

User 'mgaido91' has created a pull request for this issue:
https://github.com/apache/spark/pull/20663

> The "stages" page doesn't show any completed stages
> ---
>
> Key: SPARK-23475
> URL: https://issues.apache.org/jira/browse/SPARK-23475
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
> Fix For: 2.3.0
>
> Attachments: Screen Shot 2018-02-21 at 12.39.39 AM.png
>
>
> Run "bin/spark-shell --conf spark.ui.retainedJobs=10 --conf 
> spark.ui.retainedStages=10", type the following codes and click the "stages" 
> page, it will not show completed stages:
> {code}
> val rdd = sc.parallelize(0 to 100, 100).repartition(10).cache()
> (1 to 20).foreach { i =>
>rdd.repartition(10).count()
> }
> {code}
> Please see the attached screenshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374358#comment-16374358
 ] 

Marco Gaido commented on SPARK-23493:
-

How can it know that you are not setting the partition column at last? Names of 
the incoming dataset's columns are not an option: you can perfectly insert data 
from a dataset with completely different names. Also types are not an option to 
me, since Spark uses implicit cast when it can. I think it is just a bad usage, 
which should be fixed by the user.

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374266#comment-16374266
 ] 

Xiaoju Wu commented on SPARK-23493:
---

If that's the case, it should throw an exception to tell the users of the API 
"please set the partition column at last" instead of swallow the issue and just 
return the data inserted incorrectly, right?

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374258#comment-16374258
 ] 

Marco Gaido commented on SPARK-23493:
-

I don't think so. Partition columns are always at the end. If you describe a 
table, they are at the end. If you select from it, they are at the end. That's 
not confusing. Also Hive does the same.

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23350) [SS]Exception when stopping continuous processing application

2018-02-23 Thread Wang Yanlin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Yanlin updated SPARK-23350:

Attachment: TaskScheduler_stop.png

> [SS]Exception when stopping continuous processing application
> -
>
> Key: SPARK-23350
> URL: https://issues.apache.org/jira/browse/SPARK-23350
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wang Yanlin
>Priority: Major
> Attachments: TaskScheduler_stop.png
>
>
> SparkException happends when stopping continuous processing application, 
> using Ctrl-C in stand-alone mode.
> 18/02/02 16:12:57 ERROR ContinuousExecution: Query yanlin-CP-job [id = 
> 007f1f44-771a-4097-aaa3-28ae35c16dd9, runId = 
> 3e1ab7c1-4d6f-475a-9d2c-45577643b0dd] terminated with error
> org.apache.spark.SparkException: Writing job failed.
>   at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:105)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3$$anonfun$apply$1.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3$$anonfun$apply$1.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.runContinuous(ContinuousExecution.scala:266)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.runActivatedStream(ContinuousExecution.scala:90)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: org.apache.spark.SparkException: Job 0 cancelled because 
> SparkContext was shut down
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at 
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1831)
>   at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
>   at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1743)
>   at 
> org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1924)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
>   at org.apache.spark.SparkContext.stop(SparkContext.scala:1923)
>   at 
> org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
>   at 
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
>   at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
>   at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
>   at 
> 

[jira] [Commented] (SPARK-23350) [SS]Exception when stopping continuous processing application

2018-02-23 Thread Wang Yanlin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374236#comment-16374236
 ] 

Wang Yanlin commented on SPARK-23350:
-

add the sequence flow for explainning this error  !TaskScheduler_stop.png! 

> [SS]Exception when stopping continuous processing application
> -
>
> Key: SPARK-23350
> URL: https://issues.apache.org/jira/browse/SPARK-23350
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wang Yanlin
>Priority: Major
> Attachments: TaskScheduler_stop.png
>
>
> SparkException happends when stopping continuous processing application, 
> using Ctrl-C in stand-alone mode.
> 18/02/02 16:12:57 ERROR ContinuousExecution: Query yanlin-CP-job [id = 
> 007f1f44-771a-4097-aaa3-28ae35c16dd9, runId = 
> 3e1ab7c1-4d6f-475a-9d2c-45577643b0dd] terminated with error
> org.apache.spark.SparkException: Writing job failed.
>   at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:105)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3$$anonfun$apply$1.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3$$anonfun$apply$1.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.runContinuous(ContinuousExecution.scala:266)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.runActivatedStream(ContinuousExecution.scala:90)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: org.apache.spark.SparkException: Job 0 cancelled because 
> SparkContext was shut down
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at 
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1831)
>   at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
>   at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1743)
>   at 
> org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1924)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
>   at org.apache.spark.SparkContext.stop(SparkContext.scala:1923)
>   at 
> org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
>   at 
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
>   at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
>   at 
> 

[jira] [Updated] (SPARK-23350) [SS]Exception when stopping continuous processing application

2018-02-23 Thread Wang Yanlin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Yanlin updated SPARK-23350:

Attachment: TaskScheduler_stop.png

> [SS]Exception when stopping continuous processing application
> -
>
> Key: SPARK-23350
> URL: https://issues.apache.org/jira/browse/SPARK-23350
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wang Yanlin
>Priority: Major
>
> SparkException happends when stopping continuous processing application, 
> using Ctrl-C in stand-alone mode.
> 18/02/02 16:12:57 ERROR ContinuousExecution: Query yanlin-CP-job [id = 
> 007f1f44-771a-4097-aaa3-28ae35c16dd9, runId = 
> 3e1ab7c1-4d6f-475a-9d2c-45577643b0dd] terminated with error
> org.apache.spark.SparkException: Writing job failed.
>   at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:105)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3$$anonfun$apply$1.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3$$anonfun$apply$1.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.runContinuous(ContinuousExecution.scala:266)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.runActivatedStream(ContinuousExecution.scala:90)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: org.apache.spark.SparkException: Job 0 cancelled because 
> SparkContext was shut down
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at 
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1831)
>   at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
>   at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1743)
>   at 
> org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1924)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
>   at org.apache.spark.SparkContext.stop(SparkContext.scala:1923)
>   at 
> org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
>   at 
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
>   at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
>   at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
>   at 
> 

[jira] [Updated] (SPARK-23350) [SS]Exception when stopping continuous processing application

2018-02-23 Thread Wang Yanlin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Yanlin updated SPARK-23350:

Attachment: (was: TaskScheduler_stop.png)

> [SS]Exception when stopping continuous processing application
> -
>
> Key: SPARK-23350
> URL: https://issues.apache.org/jira/browse/SPARK-23350
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wang Yanlin
>Priority: Major
>
> SparkException happends when stopping continuous processing application, 
> using Ctrl-C in stand-alone mode.
> 18/02/02 16:12:57 ERROR ContinuousExecution: Query yanlin-CP-job [id = 
> 007f1f44-771a-4097-aaa3-28ae35c16dd9, runId = 
> 3e1ab7c1-4d6f-475a-9d2c-45577643b0dd] terminated with error
> org.apache.spark.SparkException: Writing job failed.
>   at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:105)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3$$anonfun$apply$1.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3$$anonfun$apply$1.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$runContinuous$3.apply(ContinuousExecution.scala:268)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.runContinuous(ContinuousExecution.scala:266)
>   at 
> org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.runActivatedStream(ContinuousExecution.scala:90)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: org.apache.spark.SparkException: Job 0 cancelled because 
> SparkContext was shut down
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at 
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1831)
>   at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
>   at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1743)
>   at 
> org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1924)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
>   at org.apache.spark.SparkContext.stop(SparkContext.scala:1923)
>   at 
> org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
>   at 
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
>   at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
>   at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
>   at 
> 

[jira] [Comment Edited] (SPARK-21550) approxQuantiles throws "next on empty iterator" on empty data

2018-02-23 Thread Javier (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374218#comment-16374218
 ] 

Javier edited comment on SPARK-21550 at 2/23/18 10:59 AM:
--

I still observe this behavior in 2.2.0 when approxQuantile is applied to a 
single column that only contains None's:


{code:java}
...
 File "/pyspark.zip/pyspark/sql/dataframe.py", line 1402, in approxQuantile
 File "/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in _call_
 File "/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
 File "/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling 
o202.approxQuantile.
 : java.util.NoSuchElementException: next on empty iterator
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
 at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
 at scala.collection.IterableLike$class.head(IterableLike.scala:107)
 at 
scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186)
 at 
scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
 at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.last(TraversableLike.scala:431)
 at 
scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$last(ArrayOps.scala:186)
 at 
scala.collection.IndexedSeqOptimized$class.last(IndexedSeqOptimized.scala:132)
 at scala.collection.mutable.ArrayOps$ofRef.last(ArrayOps.scala:186)
 at 
org.apache.spark.sql.catalyst.util.QuantileSummaries.query(QuantileSummaries.scala:207)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply$mcDD$sp(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply(StatFunctions.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1.apply(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1.apply(StatFunctions.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$.multipleApproxQuantiles(StatFunctions.scala:92)
 at 
org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:73)
 at 
org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:84)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at py4j.Gateway.invoke(Gateway.java:280)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:214)
 at java.lang.Thread.run(Thread.java:748){code}
 


was (Author: jabot):
I still observe this behavior in 2.2.0 when approxQuantile is applied to a 
single column that only contains None's:
 \{{}}
{code:java}
...
 File "/pyspark.zip/pyspark/sql/dataframe.py", line 1402, in approxQuantile
 File "/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in _call_
 File "/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
 File "/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling 
o202.approxQuantile.
 : java.util.NoSuchElementException: next 

[jira] [Comment Edited] (SPARK-21550) approxQuantiles throws "next on empty iterator" on empty data

2018-02-23 Thread Javier (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374218#comment-16374218
 ] 

Javier edited comment on SPARK-21550 at 2/23/18 10:58 AM:
--

I still observe this behavior in 2.2.0 when approxQuantile is applied to a 
single column that only contains None's:
 \{{}}
{code:java}
...
 File "/pyspark.zip/pyspark/sql/dataframe.py", line 1402, in approxQuantile
 File "/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in _call_
 File "/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
 File "/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling 
o202.approxQuantile.
 : java.util.NoSuchElementException: next on empty iterator
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
 at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
 at scala.collection.IterableLike$class.head(IterableLike.scala:107)
 at 
scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186)
 at 
scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
 at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.last(TraversableLike.scala:431)
 at 
scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$last(ArrayOps.scala:186)
 at 
scala.collection.IndexedSeqOptimized$class.last(IndexedSeqOptimized.scala:132)
 at scala.collection.mutable.ArrayOps$ofRef.last(ArrayOps.scala:186)
 at 
org.apache.spark.sql.catalyst.util.QuantileSummaries.query(QuantileSummaries.scala:207)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply$mcDD$sp(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply(StatFunctions.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1.apply(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1.apply(StatFunctions.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$.multipleApproxQuantiles(StatFunctions.scala:92)
 at 
org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:73)
 at 
org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:84)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at py4j.Gateway.invoke(Gateway.java:280)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:214)
 at java.lang.Thread.run(Thread.java:748){code}
 


was (Author: jabot):
I still observe this behavior in 2.2.0 when approxQuantile is applied to a 
single column that only contains None's:
{{}}
{code:java}
...
 File 
"/mnt/yarn/usercache/hadoop/appcache/application_1519290822117_0043/container_1519290822117_0043_01_01/pyspark.zip/pyspark/sql/dataframe.py",
 line 1402, in approxQuantile
 File 
"/mnt/yarn/usercache/hadoop/appcache/application_1519290822117_0043/container_1519290822117_0043_01_01/py4j-0.10.4-src.zip/py4j/java_gateway.py",
 line 1133, in _call_
 File 

[jira] [Commented] (SPARK-21550) approxQuantiles throws "next on empty iterator" on empty data

2018-02-23 Thread Javier (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374218#comment-16374218
 ] 

Javier commented on SPARK-21550:


I still observe this behavior in 2.2.0 when approxQuantile is applied to a 
single column that only contains None's:
{{}}
{code:java}
...
 File 
"/mnt/yarn/usercache/hadoop/appcache/application_1519290822117_0043/container_1519290822117_0043_01_01/pyspark.zip/pyspark/sql/dataframe.py",
 line 1402, in approxQuantile
 File 
"/mnt/yarn/usercache/hadoop/appcache/application_1519290822117_0043/container_1519290822117_0043_01_01/py4j-0.10.4-src.zip/py4j/java_gateway.py",
 line 1133, in _call_
 File 
"/mnt/yarn/usercache/hadoop/appcache/application_1519290822117_0043/container_1519290822117_0043_01_01/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
 File 
"/mnt/yarn/usercache/hadoop/appcache/application_1519290822117_0043/container_1519290822117_0043_01_01/py4j-0.10.4-src.zip/py4j/protocol.py",
 line 319, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling 
o202.approxQuantile.
 : java.util.NoSuchElementException: next on empty iterator
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
 at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
 at scala.collection.IterableLike$class.head(IterableLike.scala:107)
 at 
scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186)
 at 
scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
 at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.last(TraversableLike.scala:431)
 at 
scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$last(ArrayOps.scala:186)
 at 
scala.collection.IndexedSeqOptimized$class.last(IndexedSeqOptimized.scala:132)
 at scala.collection.mutable.ArrayOps$ofRef.last(ArrayOps.scala:186)
 at 
org.apache.spark.sql.catalyst.util.QuantileSummaries.query(QuantileSummaries.scala:207)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply$mcDD$sp(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1$$anonfun$apply$1.apply(StatFunctions.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1.apply(StatFunctions.scala:92)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$multipleApproxQuantiles$1.apply(StatFunctions.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
 at 
org.apache.spark.sql.execution.stat.StatFunctions$.multipleApproxQuantiles(StatFunctions.scala:92)
 at 
org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:73)
 at 
org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:84)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at py4j.Gateway.invoke(Gateway.java:280)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:214)
 at java.lang.Thread.run(Thread.java:748){code}
 

> approxQuantiles throws "next on empty iterator" on empty data
> -
>
> Key: SPARK-21550
> URL: 

[jira] [Commented] (SPARK-23405) The task will hang up when a small table left semi join a big table

2018-02-23 Thread KaiXinXIaoLei (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374182#comment-16374182
 ] 

KaiXinXIaoLei commented on SPARK-23405:
---

[~q79969786] And if i run 'select ls.cs_order_number from ls left semi join 
catalog_sales cs on ls.cs_order_number = cs.cs_order_number and 
cs.cs_order_number is not null', the job will success

> The task will hang up when a small table left semi join a big table
> ---
>
> Key: SPARK-23405
> URL: https://issues.apache.org/jira/browse/SPARK-23405
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: KaiXinXIaoLei
>Priority: Major
> Attachments: SQL.png, taskhang up.png
>
>
> I run a sql: `select ls.cs_order_number from ls left semi join catalog_sales 
> cs on ls.cs_order_number = cs.cs_order_number`, The `ls` table is a small 
> table ,and the number is one. The `catalog_sales` table is a big table,  and 
> the number is 10 billion. The task will be hang up:
> !taskhang up.png!
>  And the sql page is :
> !SQL.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374172#comment-16374172
 ] 

Xiaoju Wu commented on SPARK-23493:
---

[~mgaido] "Columns are matched in order while inserting" This is acceptable, 
but if you looking into my case, you will find it forces the partition key to 
be placed at the last of the columns, that seems confusing. 

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374146#comment-16374146
 ] 

Marco Gaido commented on SPARK-23493:
-

I don't think this is an issue. I think this is the expected behavior. Columns 
are matched in order while inserting.

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23475) The "stages" page doesn't show any completed stages

2018-02-23 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374139#comment-16374139
 ] 

Apache Spark commented on SPARK-23475:
--

User 'mgaido91' has created a pull request for this issue:
https://github.com/apache/spark/pull/20662

> The "stages" page doesn't show any completed stages
> ---
>
> Key: SPARK-23475
> URL: https://issues.apache.org/jira/browse/SPARK-23475
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
> Fix For: 2.3.0
>
> Attachments: Screen Shot 2018-02-21 at 12.39.39 AM.png
>
>
> Run "bin/spark-shell --conf spark.ui.retainedJobs=10 --conf 
> spark.ui.retainedStages=10", type the following codes and click the "stages" 
> page, it will not show completed stages:
> {code}
> val rdd = sc.parallelize(0 to 100, 100).repartition(10).cache()
> (1 to 20).foreach { i =>
>rdd.repartition(10).count()
> }
> {code}
> Please see the attached screenshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374076#comment-16374076
 ] 

Xiaoju Wu commented on SPARK-23493:
---

This issue is similar with the issue described in ticket SPARK-9278. While 
seems in that ticket, only fixed the insert-into cannot be run together with 
partitionBy. If I try inserting into a table already set with partition key, 
the data are inserted incorrectly. 

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data

2018-02-23 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374073#comment-16374073
 ] 

Xiaoju Wu commented on SPARK-9278:
--

Created a new ticket to trace this issue SPARK-23493

> DataFrameWriter.insertInto inserts incorrect data
> -
>
> Key: SPARK-9278
> URL: https://issues.apache.org/jira/browse/SPARK-9278
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.0
> Environment: Linux, S3, Hive Metastore
>Reporter: Steve Lindemann
>Assignee: Cheng Lian
>Priority: Critical
>
> After creating a partitioned Hive table (stored as Parquet) via the 
> DataFrameWriter.createTable command, subsequent attempts to insert additional 
> data into new partitions of this table result in inserting incorrect data 
> rows. Reordering the columns in the data to be written seems to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaoju Wu updated SPARK-23493:
--
Description: 
insert-into only works when the partitionby key columns are set at last:

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 spark
 .createDataset(data2)
 .toDF("col1", "col2", "col3")
  .write
 .insertInto(table)

sql("select * from " + table).show()

++-++
|col2|col3|col1|

++-++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|8|null|0|
|9|null|0|
|7|null|1|

++-++

 

If you try inserting with sql, the issue is the same.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

sql("insert into " + table + " values(7,'test2',1.0)")
 sql("select * from " + table).show()

+--+---++
|col2|col3|col1|

+--+---++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|7|null|1|

+--+---++

No exception was thrown since I only run insertInto, not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column, it works.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col3")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

spark
 .createDataset(data2)
 .toDF("col1", "col2", "col3")
 .write
 .insertInto(table)

sql("select * from " + table).show()

+---+--++
|col1|col2|col3|

+---+--++
|8|test#test|0.0|
|9|test3|0.0|
|8|test#test|0.0|
|9|test3|0.0|
|7|test1|1.0|
|7|test2|1.0|

+---+--++

  was:
Seems the issue still exists, here's the test:

val data = Seq(
(7, "test1", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
.createDataset(data)
.toDF("col1", "col2", "col3")
.write
.partitionBy("col1")
.saveAsTable(table)

val data2 = Seq(
(7, "test2", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)

 spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
 .write
.insertInto(table)

sql("select * from " + table).show()

+---+--++
|col2|col3|col1|

+---+--++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|8|null|0|
|9|null|0|
|7|null|1|

+---+--++

 

If you try inserting with sql, the issue is the same.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

 val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

 val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 sql("insert into " + table + " values(7,'test2',1.0)")
 sql("select * from " + table).show()

+-+++
| col2|col3|col1|
+-+++
|test#test| 0.0| 8|
| test1| 1.0| 7|
| test3| 0.0| 9|
| 7|null| 1|
+-+++

No exception was thrown since I only run insertInto, not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column, it works.

val data = Seq(
(7, "test1", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
.createDataset(data)
.toDF("col1", "col2", "col3")
.write
.partitionBy("col3")
.saveAsTable(table)

val data2 = Seq(
(7, "test2", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)

spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
.write
.insertInto(table)

sql("select * from " + table).show()

+--+---++
|col1|col2|col3|

+--+---++
|8|test#test|0.0|
|9|test3|0.0|
|8|test#test|0.0|
|9|test3|0.0|
|7|test1|1.0|
|7|test2|1.0|

+--+---++


> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only 

[jira] [Created] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)
Xiaoju Wu created SPARK-23493:
-

 Summary: insert-into depends on columns order, otherwise incorrect 
data inserted
 Key: SPARK-23493
 URL: https://issues.apache.org/jira/browse/SPARK-23493
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.1
Reporter: Xiaoju Wu


Seems the issue still exists, here's the test:

val data = Seq(
(7, "test1", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
.createDataset(data)
.toDF("col1", "col2", "col3")
.write
.partitionBy("col1")
.saveAsTable(table)

val data2 = Seq(
(7, "test2", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)

 spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
 .write
.insertInto(table)

sql("select * from " + table).show()

+---+--++
|col2|col3|col1|

+---+--++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|8|null|0|
|9|null|0|
|7|null|1|

+---+--++

 

If you try inserting with sql, the issue is the same.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

 val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

 val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 sql("insert into " + table + " values(7,'test2',1.0)")
 sql("select * from " + table).show()

+-+++
| col2|col3|col1|
+-+++
|test#test| 0.0| 8|
| test1| 1.0| 7|
| test3| 0.0| 9|
| 7|null| 1|
+-+++

No exception was thrown since I only run insertInto, not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column, it works.

val data = Seq(
(7, "test1", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
.createDataset(data)
.toDF("col1", "col2", "col3")
.write
.partitionBy("col3")
.saveAsTable(table)

val data2 = Seq(
(7, "test2", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)

spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
.write
.insertInto(table)

sql("select * from " + table).show()

+--+---++
|col1|col2|col3|

+--+---++
|8|test#test|0.0|
|9|test3|0.0|
|8|test#test|0.0|
|9|test3|0.0|
|7|test1|1.0|
|7|test2|1.0|

+--+---++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org