[jira] [Commented] (SPARK-24432) Add support for dynamic resource allocation

2020-06-01 Thread zhoukang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17120813#comment-17120813
 ] 

zhoukang commented on SPARK-24432:
--

looking forward for this feature come true +1
by the way, [~dongjoon][~liyinan926]how can we join the sig-big-data conference 
now, since the url  
https://github.com/kubernetes/community/tree/master/sig-big-data is invalid 

> Add support for dynamic resource allocation
> ---
>
> Key: SPARK-24432
> URL: https://issues.apache.org/jira/browse/SPARK-24432
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Spark Core
>Affects Versions: 3.1.0
>Reporter: Yinan Li
>Priority: Major
>
> This is an umbrella ticket for work on adding support for dynamic resource 
> allocation into the Kubernetes mode. This requires a Kubernetes-specific 
> external shuffle service. The feature is available in our fork at 
> github.com/apache-spark-on-k8s/spark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29564) Cluster deploy mode should support Spark Thrift server

2020-02-02 Thread zhoukang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028327#comment-17028327
 ] 

zhoukang commented on SPARK-29564:
--

Any progress? [~cltlfcjin] We also support yarn cluster mode in our production 
enviroment which use zk for router

> Cluster deploy mode should support Spark Thrift server
> --
>
> Key: SPARK-29564
> URL: https://issues.apache.org/jira/browse/SPARK-29564
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit, SQL
>Affects Versions: 2.4.4, 3.0.0
>Reporter: Lantao Jin
>Priority: Major
>
> Cluster deploy mode is not applicable to Spark Thrift server now. This 
> restriction is too rude.
> In our production, we use multiple Spark Thrift servers as long running 
> services which are used yarn-cluster mode to launch. The life cycle of STS is 
> managed by upper layer manager system which is also used to dispatcher user's 
> JDBC connection to applicable STS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-29880) Handle submit exception when target hadoop cluster is Federation

2019-11-13 Thread zhoukang (Jira)
zhoukang created SPARK-29880:


 Summary: Handle submit exception when target hadoop cluster is 
Federation
 Key: SPARK-29880
 URL: https://issues.apache.org/jira/browse/SPARK-29880
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.4
Reporter: zhoukang


When we submit application to federation yarn cluster. Since 
getYarnClusterMetrics is not implemented. The submission will exit with failure.
{code:java}
def submitApplication(): ApplicationId = {
ResourceRequestHelper.validateResources(sparkConf)

var appId: ApplicationId = null
try {
  launcherBackend.connect()
  yarnClient.init(hadoopConf)
  yarnClient.start()

 logInfo("Requesting a new application from cluster with %d NodeManagers"
  .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-09-11 Thread zhoukang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928204#comment-16928204
 ] 

zhoukang commented on SPARK-21492:
--

Any progress of this issue? [~jiangxb1987]
We also encountered this problem

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue

2019-06-12 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861780#comment-16861780
 ] 

zhoukang commented on SPARK-27068:
--

Sorry [~dongjoon] i am not familiar with Hive.But i think this is useful for 
some use case.Maybe we cant add a switch-off configuration for this?

> Support failed jobs ui and completed jobs ui use different queue
> 
>
> Key: SPARK-27068
> URL: https://issues.apache.org/jira/browse/SPARK-27068
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 3.0.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 屏幕快照 2019-06-06 下午1.12.04.png
>
>
> For some long running jobs,we may want to check out the cause of some failed 
> jobs.
> But most jobs has completed and failed jobs ui may disappear, we can use 
> different queue for this two kinds of jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27827) File does not exist notice is misleading in FileScanRDD

2019-06-05 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16857297#comment-16857297
 ] 

zhoukang commented on SPARK-27827:
--

I just test this in 2.3 cluster [~dongjoon] 

> File does not exist notice is misleading in FileScanRDD
> ---
>
> Key: SPARK-27827
> URL: https://issues.apache.org/jira/browse/SPARK-27827
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: zhoukang
>Priority: Minor
>
> When we encounter error below, we will try "refresh table" and will think the 
> error will not thrown again.
> {code:java}
> Error: java.lang.IllegalStateException: Can't overwrite cause with 
> java.io.FileNotFoundException: File does not exist: 
> /user/s_xdata/kuduhive_warehouse/info_dev/dws_quality_time_dictionary/part-3-92c84bf9-99c0-49d9-8cdf-78b1844d75c3.snappy.parquet
> It is possible the underlying files have been updated. You can explicitly 
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in 
> SQL or by recreating the Dataset/DataFrame involved. (state=,code=0)
> {code}
> The cause is 'InMemoryFileIndex' will be cached in 'HiveMetaStoreCatalog'.And 
> refresh command will only invalidate table of current session.The notice is 
> misleading when we have a long-running thriftserver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27827) File does not exist notice is misleading in FileScanRDD

2019-06-05 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-27827:
-
Affects Version/s: (was: 2.4.3)

> File does not exist notice is misleading in FileScanRDD
> ---
>
> Key: SPARK-27827
> URL: https://issues.apache.org/jira/browse/SPARK-27827
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: zhoukang
>Priority: Minor
>
> When we encounter error below, we will try "refresh table" and will think the 
> error will not thrown again.
> {code:java}
> Error: java.lang.IllegalStateException: Can't overwrite cause with 
> java.io.FileNotFoundException: File does not exist: 
> /user/s_xdata/kuduhive_warehouse/info_dev/dws_quality_time_dictionary/part-3-92c84bf9-99c0-49d9-8cdf-78b1844d75c3.snappy.parquet
> It is possible the underlying files have been updated. You can explicitly 
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in 
> SQL or by recreating the Dataset/DataFrame involved. (state=,code=0)
> {code}
> The cause is 'InMemoryFileIndex' will be cached in 'HiveMetaStoreCatalog'.And 
> refresh command will only invalidate table of current session.The notice is 
> misleading when we have a long-running thriftserver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue

2019-06-05 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16857291#comment-16857291
 ] 

zhoukang commented on SPARK-27068:
--

[~srowen] Here is a use case in our cluster.
We have a long running spark sql thriftserver, and users use that as ad-hoc 
query engine and also for a online bi service.
Since failure number is not too large. But total query number will quickly 
increased as show in image below.
When we want to find the root cause for the failed query, currently is really 
not too convenience.
 !屏幕快照 2019-06-06 下午1.12.04.png! 

> Support failed jobs ui and completed jobs ui use different queue
> 
>
> Key: SPARK-27068
> URL: https://issues.apache.org/jira/browse/SPARK-27068
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 屏幕快照 2019-06-06 下午1.12.04.png
>
>
> For some long running jobs,we may want to check out the cause of some failed 
> jobs.
> But most jobs has completed and failed jobs ui may disappear, we can use 
> different queue for this two kinds of jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue

2019-06-05 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-27068:
-
Attachment: 屏幕快照 2019-06-06 下午1.12.04.png

> Support failed jobs ui and completed jobs ui use different queue
> 
>
> Key: SPARK-27068
> URL: https://issues.apache.org/jira/browse/SPARK-27068
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 屏幕快照 2019-06-06 下午1.12.04.png
>
>
> For some long running jobs,we may want to check out the cause of some failed 
> jobs.
> But most jobs has completed and failed jobs ui may disappear, we can use 
> different queue for this two kinds of jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27827) File does not exist notice is misleading in FileScanRDD

2019-05-23 Thread zhoukang (JIRA)
zhoukang created SPARK-27827:


 Summary: File does not exist notice is misleading in FileScanRDD
 Key: SPARK-27827
 URL: https://issues.apache.org/jira/browse/SPARK-27827
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.3, 2.3.2
Reporter: zhoukang


When we encounter error below, we will try "refresh table" and will think the 
error will not thrown again.
{code:java}
Error: java.lang.IllegalStateException: Can't overwrite cause with 
java.io.FileNotFoundException: File does not exist: 
/user/s_xdata/kuduhive_warehouse/info_dev/dws_quality_time_dictionary/part-3-92c84bf9-99c0-49d9-8cdf-78b1844d75c3.snappy.parquet
It is possible the underlying files have been updated. You can explicitly 
invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in 
SQL or by recreating the Dataset/DataFrame involved. (state=,code=0)
{code}
The cause is 'InMemoryFileIndex' will be cached in 'HiveMetaStoreCatalog'.And 
refresh command will only invalidate table of current session.The notice is 
misleading when we have a long-running thriftserver.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25299) Use remote storage for persisting shuffle data

2019-04-22 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823613#comment-16823613
 ] 

zhoukang commented on SPARK-25299:
--

nice work!
Really looking forward thanks [~yifeih]

> Use remote storage for persisting shuffle data
> --
>
> Key: SPARK-25299
> URL: https://issues.apache.org/jira/browse/SPARK-25299
> Project: Spark
>  Issue Type: New Feature
>  Components: Shuffle
>Affects Versions: 2.4.0
>Reporter: Matt Cheah
>Priority: Major
>
> In Spark, the shuffle primitive requires Spark executors to persist data to 
> the local disk of the worker nodes. If executors crash, the external shuffle 
> service can continue to serve the shuffle data that was written beyond the 
> lifetime of the executor itself. In YARN, Mesos, and Standalone mode, the 
> external shuffle service is deployed on every worker node. The shuffle 
> service shares local disk with the executors that run on its node.
> There are some shortcomings with the way shuffle is fundamentally implemented 
> right now. Particularly:
>  * If any external shuffle service process or node becomes unavailable, all 
> applications that had an executor that ran on that node must recompute the 
> shuffle blocks that were lost.
>  * Similarly to the above, the external shuffle service must be kept running 
> at all times, which may waste resources when no applications are using that 
> shuffle service node.
>  * Mounting local storage can prevent users from taking advantage of 
> desirable isolation benefits from using containerized environments, like 
> Kubernetes. We had an external shuffle service implementation in an early 
> prototype of the Kubernetes backend, but it was rejected due to its strict 
> requirement to be able to mount hostPath volumes or other persistent volume 
> setups.
> In the following [architecture discussion 
> document|https://docs.google.com/document/d/1uCkzGGVG17oGC6BJ75TpzLAZNorvrAU3FRd2X-rVHSM/edit#heading=h.btqugnmt2h40]
>  (note: _not_ an SPIP), we brainstorm various high level architectures for 
> improving the external shuffle service in a way that addresses the above 
> problems. The purpose of this umbrella JIRA is to promote additional 
> discussion on how we can approach these problems, both at the architecture 
> level and the implementation level. We anticipate filing sub-issues that 
> break down the tasks that must be completed to achieve this goal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25299) Use remote storage for persisting shuffle data

2019-04-22 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822997#comment-16822997
 ] 

zhoukang commented on SPARK-25299:
--

is there any progress of this task? [~yifeih] [~mcheah]

> Use remote storage for persisting shuffle data
> --
>
> Key: SPARK-25299
> URL: https://issues.apache.org/jira/browse/SPARK-25299
> Project: Spark
>  Issue Type: New Feature
>  Components: Shuffle
>Affects Versions: 2.4.0
>Reporter: Matt Cheah
>Priority: Major
>
> In Spark, the shuffle primitive requires Spark executors to persist data to 
> the local disk of the worker nodes. If executors crash, the external shuffle 
> service can continue to serve the shuffle data that was written beyond the 
> lifetime of the executor itself. In YARN, Mesos, and Standalone mode, the 
> external shuffle service is deployed on every worker node. The shuffle 
> service shares local disk with the executors that run on its node.
> There are some shortcomings with the way shuffle is fundamentally implemented 
> right now. Particularly:
>  * If any external shuffle service process or node becomes unavailable, all 
> applications that had an executor that ran on that node must recompute the 
> shuffle blocks that were lost.
>  * Similarly to the above, the external shuffle service must be kept running 
> at all times, which may waste resources when no applications are using that 
> shuffle service node.
>  * Mounting local storage can prevent users from taking advantage of 
> desirable isolation benefits from using containerized environments, like 
> Kubernetes. We had an external shuffle service implementation in an early 
> prototype of the Kubernetes backend, but it was rejected due to its strict 
> requirement to be able to mount hostPath volumes or other persistent volume 
> setups.
> In the following [architecture discussion 
> document|https://docs.google.com/document/d/1uCkzGGVG17oGC6BJ75TpzLAZNorvrAU3FRd2X-rVHSM/edit#heading=h.btqugnmt2h40]
>  (note: _not_ an SPIP), we brainstorm various high level architectures for 
> improving the external shuffle service in a way that addresses the above 
> problems. The purpose of this umbrella JIRA is to promote additional 
> discussion on how we can approach these problems, both at the architecture 
> level and the implementation level. We anticipate filing sub-issues that 
> break down the tasks that must be completed to achieve this goal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26568) Too many partitions may cause thriftServer frequently Full GC

2019-04-10 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815016#comment-16815016
 ] 

zhoukang commented on SPARK-26568:
--

It is hive client we used in spark cause this problem [~srowen]

> Too many partitions may cause thriftServer frequently Full GC
> -
>
> Key: SPARK-26568
> URL: https://issues.apache.org/jira/browse/SPARK-26568
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
>
> The reason is that:
> first we have a table with many partitions(may be several hundred);second, we 
> have some concurrent queries.Then the long-running thriftServer may encounter 
> OOM issue.
> Here is a case:
> call stack of OOM thread:
> {code:java}
> pool-34-thread-10
>   at 
> org.apache.hadoop.hive.metastore.api.StorageDescriptor.(Lorg/apache/hadoop/hive/metastore/api/StorageDescriptor;)V
>  (StorageDescriptor.java:240)
>   at 
> org.apache.hadoop.hive.metastore.api.Partition.(Lorg/apache/hadoop/hive/metastore/api/Partition;)V
>  (Partition.java:216)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopy(Lorg/apache/hadoop/hive/metastore/api/Partition;)Lorg/apache/hadoop/hive/metastore/api/Partition;
>  (HiveMetaStoreClient.java:1343)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/Collection;Ljava/util/List;)Ljava/util/List;
>  (HiveMetaStoreClient.java:1409)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/List;)Ljava/util/List;
>  (HiveMetaStoreClient.java:1397)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List;
>  (HiveMetaStoreClient.java:914)
>   at 
> sun.reflect.GeneratedMethodAccessor98.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
>  (Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
>  (DelegatingMethodAccessorImpl.java:43)
>   at 
> java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
>  (Method.java:606)
>   at 
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(Ljava/lang/Object;Ljava/lang/reflect/Method;[Ljava/lang/Object;)Ljava/lang/Object;
>  (RetryingMetaStoreClient.java:90)
>   at 
> com.sun.proxy.$Proxy30.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List;
>  (Unknown Source)
>   at 
> org.apache.hadoop.hive.ql.metadata.Hive.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Table;Ljava/lang/String;)Ljava/util/List;
>  (Hive.java:1967)
>   at 
> sun.reflect.GeneratedMethodAccessor97.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
>  (Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
>  (DelegatingMethodAccessorImpl.java:43)
>   at 
> java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
>  (Method.java:606)
>   at 
> org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Hive;Lorg/apache/hadoop/hive/ql/metadata/Table;Lscala/collection/Seq;)Lscala/collection/Seq;
>  (HiveShim.scala:602)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Lscala/collection/Seq;
>  (HiveClientImpl.scala:608)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Ljava/lang/Object;
>  (HiveClientImpl.scala:606)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply()Ljava/lang/Object;
>  (HiveClientImpl.scala:321)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(Lscala/Function0;Lscala/runtime/IntRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)V
>  (HiveClientImpl.scala:264)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(Lscala/Function0;)Ljava/lang/Object;
>  (HiveClientImpl.scala:263)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(Lscala/Function0;)Ljava/lang/Object;
>  (HiveClientImpl.scala:307)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(Lorg/apache/spark/sql/catalyst/catalog/CatalogTable;Lscala/collection/Seq;)Lscala/collection/Seq;
>  (HiveClientImpl.scala:606)
>   at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Lscala/collection/Seq;
>  (HiveExternalCatalog.scala:1017)
>   at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Ljava/lang/Object;
>  (HiveExternalCatalog.scala:1000)
>   at 
> 

[jira] [Commented] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it

2019-04-10 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815017#comment-16815017
 ] 

zhoukang commented on SPARK-26703:
--

I can make a pr for this [~hyukjin.kwon]

> Hive record writer will always depends on parquet-1.6 writer should fix it 
> ---
>
> Key: SPARK-26703
> URL: https://issues.apache.org/jira/browse/SPARK-26703
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2, 2.4.0
>Reporter: zhoukang
>Priority: Major
>
> Currently, when we are using insert into hive table related command.
> The parquet file generated will always be version 1.6,reason is below:
> 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter
> {code:java}
> private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
> jobConf,
> tableDesc,
> serializer.getSerializedClass,
> fileSinkConf,
> new Path(path),
> Reporter.NULL)
> {code}
> 2. we will call 
> {code:java}
> public static RecordWriter getHiveRecordWriter(JobConf jc,
>   TableDesc tableInfo, Class outputClass,
>   FileSinkDesc conf, Path outPath, Reporter reporter) throws 
> HiveException {
> HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, 
> tableInfo);
> try {
>   boolean isCompressed = conf.getCompressed();
>   JobConf jc_output = jc;
>   if (isCompressed) {
> jc_output = new JobConf(jc);
> String codecStr = conf.getCompressCodec();
> if (codecStr != null && !codecStr.trim().equals("")) {
>   Class codec = 
>   (Class) 
> JavaUtils.loadClass(codecStr);
>   FileOutputFormat.setOutputCompressorClass(jc_output, codec);
> }
> String type = conf.getCompressType();
> if (type != null && !type.trim().equals("")) {
>   CompressionType style = CompressionType.valueOf(type);
>   SequenceFileOutputFormat.setOutputCompressionType(jc, style);
> }
>   }
>   return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
>   isCompressed, tableInfo.getProperties(), outPath, reporter);
> } catch (Exception e) {
>   throw new HiveException(e);
> }
>   }
>   public static RecordWriter getRecordWriter(JobConf jc,
>   OutputFormat outputFormat,
>   Class valueClass, boolean isCompressed,
>   Properties tableProp, Path outPath, Reporter reporter
>   ) throws IOException, HiveException {
> if (!(outputFormat instanceof HiveOutputFormat)) {
>   outputFormat = new HivePassThroughOutputFormat(outputFormat);
> }
> return ((HiveOutputFormat)outputFormat).getHiveRecordWriter(
> jc, outPath, valueClass, isCompressed, tableProp, reporter);
>   }
> {code}
> 3. then in MapredParquetOutPutFormat
> {code:java}
> public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
> getHiveRecordWriter(
>   final JobConf jobConf,
>   final Path finalOutPath,
>   final Class valueClass,
>   final boolean isCompressed,
>   final Properties tableProperties,
>   final Progressable progress) throws IOException {
> LOG.info("creating new record writer..." + this);
> final String columnNameProperty = 
> tableProperties.getProperty(IOConstants.COLUMNS);
> final String columnTypeProperty = 
> tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
> List columnNames;
> List columnTypes;
> if (columnNameProperty.length() == 0) {
>   columnNames = new ArrayList();
> } else {
>   columnNames = Arrays.asList(columnNameProperty.split(","));
> }
> if (columnTypeProperty.length() == 0) {
>   columnTypes = new ArrayList();
> } else {
>   columnTypes = 
> TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
> }
> 
> DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, 
> columnTypes), jobConf);
> return getParquerRecordWriterWrapper(realOutputFormat, jobConf, 
> finalOutPath.toString(),
> progress,tableProperties);
>   }
> {code}
> 4. then call 
> {code:java}
> public ParquetRecordWriterWrapper(
>   final OutputFormat realOutputFormat,
>   final JobConf jobConf,
>   final String name,
>   final Progressable progress, Properties tableProperties) throws
>   IOException {
> try {
>   // create a TaskInputOutputContext
>   TaskAttemptID taskAttemptID = 
> TaskAttemptID.forName(jobConf.get("mapred.task.id"));
>   if (taskAttemptID == null) {
> taskAttemptID = new TaskAttemptID();
>   }
>   taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
>   LOG.info("initialize serde with table properties.");
>   initializeSerProperties(taskContext, tableProperties);
>   

[jira] [Commented] (SPARK-26533) Support query auto cancel on thriftserver

2019-04-10 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815015#comment-16815015
 ] 

zhoukang commented on SPARK-26533:
--

I will work on this

> Support query auto cancel on thriftserver
> -
>
> Key: SPARK-26533
> URL: https://issues.apache.org/jira/browse/SPARK-26533
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
>
> Support query auto cancelling when running too long on thriftserver.
> For some cases,we use thriftserver as long-running applications.
> Some times we want all the query need not to run more than given time.
> In these cases,we can enable auto cancel for time-consumed query.Which can 
> let us release resources for other queries to run.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue

2019-04-10 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815005#comment-16815005
 ] 

zhoukang commented on SPARK-27068:
--

For long-running application like spark sql,showing failed jobs in a single 
table will keep more clue for later debug [~shahid]

> Support failed jobs ui and completed jobs ui use different queue
> 
>
> Key: SPARK-27068
> URL: https://issues.apache.org/jira/browse/SPARK-27068
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
>
> For some long running jobs,we may want to check out the cause of some failed 
> jobs.
> But most jobs has completed and failed jobs ui may disappear, we can use 
> different queue for this two kinds of jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue

2019-03-05 Thread zhoukang (JIRA)
zhoukang created SPARK-27068:


 Summary: Support failed jobs ui and completed jobs ui use 
different queue
 Key: SPARK-27068
 URL: https://issues.apache.org/jira/browse/SPARK-27068
 Project: Spark
  Issue Type: Improvement
  Components: Web UI
Affects Versions: 2.4.0
Reporter: zhoukang


For some long running jobs,we may want to check out the cause of some failed 
jobs.
But most jobs has completed and failed jobs ui may disappear, we can use 
different queue for this two kinds of jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26951) Should not throw KryoException when root cause is IOexception

2019-02-20 Thread zhoukang (JIRA)
zhoukang created SPARK-26951:


 Summary: Should not throw KryoException when root cause is 
IOexception
 Key: SPARK-26951
 URL: https://issues.apache.org/jira/browse/SPARK-26951
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: zhoukang


Job will failed with below exception:

{code:java}
Job aborted due to stage failure: Task 1576 in stage 97.0 failed 4 times, most 
recent failure: Lost task 1576.3 in stage 97.0 (TID 121949, xxx, executor 14): 
com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is 
corrupted. The lz4's magic number should be LZ4Block(4c5a34426c6f636b), but 
received buffer's head bytes is ().
{code}

{code:java}
Job aborted due to stage failure: Task 1576 in stage 97.0 failed 4 times, most 
recent failure: Lost task 1576.3 in stage 97.0 (TID 121949, xxx, executor 14): 
com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is 
corrupted. The lz4's magic number should be LZ4Block(4c5a34426c6f636b), but 
received buffer's head bytes is ().
at com.esotericsoftware.kryo.io.Input.fill(Input.java:166)
at com.esotericsoftware.kryo.io.Input.require(Input.java:196)
at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:373)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:127)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:180)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:324)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted. The lz4's magic number 
should be LZ4Block(4c5a34426c6f636b), but received buffer's head bytes is 
().
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:169)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:127)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:164)
... 19 more

Driver stacktrace:
{code}
For IOException, it should retry




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26914) ThriftServer scheduler pool may be unpredictably when using fair schedule mode

2019-02-18 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26914:
-
Attachment: 26914-03.png
26914-02.png
26914-01.png

> ThriftServer scheduler pool may be unpredictably when using fair schedule mode
> --
>
> Key: SPARK-26914
> URL: https://issues.apache.org/jira/browse/SPARK-26914
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26914-01.png, 26914-02.png, 26914-03.png
>
>
> When using fair scheduler mode for thrift server, we may have unpredictable 
> result.
> {code:java}
> val pool = sessionToActivePool.get(parentSession.getSessionHandle)
> if (pool != null) {
>   
> sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, 
> pool)
> }
> {code}
> Here is an example:
> We have some query will use default pool, however it submit to 'normal' pool
> I changed code and add some log.Got some strange result.
> Then i found out that the localProperties of SparkContext may has 
> unpredictable result when call setLocalProperty. And since thriftserver use 
> thread pool to execute queries, it will trigger this bug sometimes.
> {code:java}
> /**
>* Set a local property that affects jobs submitted from this thread, such 
> as the Spark fair
>* scheduler pool. User-defined properties may also be set here. These 
> properties are propagated
>* through to worker tasks and can be accessed there via
>* [[org.apache.spark.TaskContext#getLocalProperty]].
>*
>* These properties are inherited by child threads spawned from this 
> thread. This
>* may have unexpected consequences when working with thread pools. The 
> standard java
>* implementation of thread pools have worker threads spawn other worker 
> threads.
>* As a result, local properties may propagate unpredictably.
>*/
>   def setLocalProperty(key: String, value: String) {
> if (value == null) {
>   localProperties.get.remove(key)
> } else {
>   localProperties.get.setProperty(key, value)
> }
>   }
> {code}
>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26914) ThriftServer scheduler pool may be unpredictably when using fair schedule mode

2019-02-18 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26914:
-
Description: 
When using fair scheduler mode for thrift server, we may have unpredictable 
result.

{code:java}
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
  
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, 
pool)
}
{code}

Here is an example:
We have some query will use default pool, however it submit to 'normal' pool.
 !26914-02.png! 

I changed code and add some log.Got some strange result.
 !26914-01.png! 
 !26914-03.png! 

Then i found out that the localProperties of SparkContext may has unpredictable 
result when call setLocalProperty. And since thriftserver use thread pool to 
execute queries, it will trigger this bug sometimes.

{code:java}
/**
   * Set a local property that affects jobs submitted from this thread, such as 
the Spark fair
   * scheduler pool. User-defined properties may also be set here. These 
properties are propagated
   * through to worker tasks and can be accessed there via
   * [[org.apache.spark.TaskContext#getLocalProperty]].
   *
   * These properties are inherited by child threads spawned from this thread. 
This
   * may have unexpected consequences when working with thread pools. The 
standard java
   * implementation of thread pools have worker threads spawn other worker 
threads.
   * As a result, local properties may propagate unpredictably.
   */
  def setLocalProperty(key: String, value: String) {
if (value == null) {
  localProperties.get.remove(key)
} else {
  localProperties.get.setProperty(key, value)
}
  }
{code}
   


  was:
When using fair scheduler mode for thrift server, we may have unpredictable 
result.

{code:java}
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
  
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, 
pool)
}
{code}

Here is an example:
We have some query will use default pool, however it submit to 'normal' pool

I changed code and add some log.Got some strange result.

Then i found out that the localProperties of SparkContext may has unpredictable 
result when call setLocalProperty. And since thriftserver use thread pool to 
execute queries, it will trigger this bug sometimes.

{code:java}
/**
   * Set a local property that affects jobs submitted from this thread, such as 
the Spark fair
   * scheduler pool. User-defined properties may also be set here. These 
properties are propagated
   * through to worker tasks and can be accessed there via
   * [[org.apache.spark.TaskContext#getLocalProperty]].
   *
   * These properties are inherited by child threads spawned from this thread. 
This
   * may have unexpected consequences when working with thread pools. The 
standard java
   * implementation of thread pools have worker threads spawn other worker 
threads.
   * As a result, local properties may propagate unpredictably.
   */
  def setLocalProperty(key: String, value: String) {
if (value == null) {
  localProperties.get.remove(key)
} else {
  localProperties.get.setProperty(key, value)
}
  }
{code}
   



> ThriftServer scheduler pool may be unpredictably when using fair schedule mode
> --
>
> Key: SPARK-26914
> URL: https://issues.apache.org/jira/browse/SPARK-26914
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26914-01.png, 26914-02.png, 26914-03.png
>
>
> When using fair scheduler mode for thrift server, we may have unpredictable 
> result.
> {code:java}
> val pool = sessionToActivePool.get(parentSession.getSessionHandle)
> if (pool != null) {
>   
> sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, 
> pool)
> }
> {code}
> Here is an example:
> We have some query will use default pool, however it submit to 'normal' pool.
>  !26914-02.png! 
> I changed code and add some log.Got some strange result.
>  !26914-01.png! 
>  !26914-03.png! 
> Then i found out that the localProperties of SparkContext may has 
> unpredictable result when call setLocalProperty. And since thriftserver use 
> thread pool to execute queries, it will trigger this bug sometimes.
> {code:java}
> /**
>* Set a local property that affects jobs submitted from this thread, such 
> as the Spark fair
>* scheduler pool. User-defined properties may also be set here. These 
> properties are propagated
>* through to worker tasks and can be accessed there via
>* [[org.apache.spark.TaskContext#getLocalProperty]].
>*
>* These properties are inherited by child threads spawned from 

[jira] [Created] (SPARK-26914) ThriftServer scheduler pool may be unpredictably when using fair schedule mode

2019-02-18 Thread zhoukang (JIRA)
zhoukang created SPARK-26914:


 Summary: ThriftServer scheduler pool may be unpredictably when 
using fair schedule mode
 Key: SPARK-26914
 URL: https://issues.apache.org/jira/browse/SPARK-26914
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: zhoukang


When using fair scheduler mode for thrift server, we may have unpredictable 
result.

{code:java}
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
  
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, 
pool)
}
{code}

Here is an example:
We have some query will use default pool, however it submit to 'normal' pool

I changed code and add some log.Got some strange result.

Then i found out that the localProperties of SparkContext may has unpredictable 
result when call setLocalProperty. And since thriftserver use thread pool to 
execute queries, it will trigger this bug sometimes.

{code:java}
/**
   * Set a local property that affects jobs submitted from this thread, such as 
the Spark fair
   * scheduler pool. User-defined properties may also be set here. These 
properties are propagated
   * through to worker tasks and can be accessed there via
   * [[org.apache.spark.TaskContext#getLocalProperty]].
   *
   * These properties are inherited by child threads spawned from this thread. 
This
   * may have unexpected consequences when working with thread pools. The 
standard java
   * implementation of thread pools have worker threads spawn other worker 
threads.
   * As a result, local properties may propagate unpredictably.
   */
  def setLocalProperty(key: String, value: String) {
if (value == null) {
  localProperties.get.remove(key)
} else {
  localProperties.get.setProperty(key, value)
}
  }
{code}
   




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it

2019-02-12 Thread zhoukang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766682#comment-16766682
 ] 

zhoukang commented on SPARK-26703:
--

As far as i know  [~hyukjin.kwon] hive is still depend on twitter version for 
HiveFormatUtils

> Hive record writer will always depends on parquet-1.6 writer should fix it 
> ---
>
> Key: SPARK-26703
> URL: https://issues.apache.org/jira/browse/SPARK-26703
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2, 2.4.0
>Reporter: zhoukang
>Priority: Major
>
> Currently, when we are using insert into hive table related command.
> The parquet file generated will always be version 1.6,reason is below:
> 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter
> {code:java}
> private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
> jobConf,
> tableDesc,
> serializer.getSerializedClass,
> fileSinkConf,
> new Path(path),
> Reporter.NULL)
> {code}
> 2. we will call 
> {code:java}
> public static RecordWriter getHiveRecordWriter(JobConf jc,
>   TableDesc tableInfo, Class outputClass,
>   FileSinkDesc conf, Path outPath, Reporter reporter) throws 
> HiveException {
> HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, 
> tableInfo);
> try {
>   boolean isCompressed = conf.getCompressed();
>   JobConf jc_output = jc;
>   if (isCompressed) {
> jc_output = new JobConf(jc);
> String codecStr = conf.getCompressCodec();
> if (codecStr != null && !codecStr.trim().equals("")) {
>   Class codec = 
>   (Class) 
> JavaUtils.loadClass(codecStr);
>   FileOutputFormat.setOutputCompressorClass(jc_output, codec);
> }
> String type = conf.getCompressType();
> if (type != null && !type.trim().equals("")) {
>   CompressionType style = CompressionType.valueOf(type);
>   SequenceFileOutputFormat.setOutputCompressionType(jc, style);
> }
>   }
>   return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
>   isCompressed, tableInfo.getProperties(), outPath, reporter);
> } catch (Exception e) {
>   throw new HiveException(e);
> }
>   }
>   public static RecordWriter getRecordWriter(JobConf jc,
>   OutputFormat outputFormat,
>   Class valueClass, boolean isCompressed,
>   Properties tableProp, Path outPath, Reporter reporter
>   ) throws IOException, HiveException {
> if (!(outputFormat instanceof HiveOutputFormat)) {
>   outputFormat = new HivePassThroughOutputFormat(outputFormat);
> }
> return ((HiveOutputFormat)outputFormat).getHiveRecordWriter(
> jc, outPath, valueClass, isCompressed, tableProp, reporter);
>   }
> {code}
> 3. then in MapredParquetOutPutFormat
> {code:java}
> public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
> getHiveRecordWriter(
>   final JobConf jobConf,
>   final Path finalOutPath,
>   final Class valueClass,
>   final boolean isCompressed,
>   final Properties tableProperties,
>   final Progressable progress) throws IOException {
> LOG.info("creating new record writer..." + this);
> final String columnNameProperty = 
> tableProperties.getProperty(IOConstants.COLUMNS);
> final String columnTypeProperty = 
> tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
> List columnNames;
> List columnTypes;
> if (columnNameProperty.length() == 0) {
>   columnNames = new ArrayList();
> } else {
>   columnNames = Arrays.asList(columnNameProperty.split(","));
> }
> if (columnTypeProperty.length() == 0) {
>   columnTypes = new ArrayList();
> } else {
>   columnTypes = 
> TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
> }
> 
> DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, 
> columnTypes), jobConf);
> return getParquerRecordWriterWrapper(realOutputFormat, jobConf, 
> finalOutPath.toString(),
> progress,tableProperties);
>   }
> {code}
> 4. then call 
> {code:java}
> public ParquetRecordWriterWrapper(
>   final OutputFormat realOutputFormat,
>   final JobConf jobConf,
>   final String name,
>   final Progressable progress, Properties tableProperties) throws
>   IOException {
> try {
>   // create a TaskInputOutputContext
>   TaskAttemptID taskAttemptID = 
> TaskAttemptID.forName(jobConf.get("mapred.task.id"));
>   if (taskAttemptID == null) {
> taskAttemptID = new TaskAttemptID();
>   }
>   taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
>   LOG.info("initialize serde with table properties.");
>   

[jira] [Updated] (SPARK-26756) Support session conf for thriftserver

2019-01-28 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26756:
-
Description: 
We can add support for session conf.like:

{code:java}
set spark.sql.xxx.xxx=xxx
{code}
which can be useful.

  was:
We can add support for session conf.like:

{code:java}
set spark.sql.xxx.xxx=xxx
{code}
which i think can be useful.


> Support session conf for thriftserver
> -
>
> Key: SPARK-26756
> URL: https://issues.apache.org/jira/browse/SPARK-26756
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
>
> We can add support for session conf.like:
> {code:java}
> set spark.sql.xxx.xxx=xxx
> {code}
> which can be useful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26756) Support session conf for thriftserver

2019-01-28 Thread zhoukang (JIRA)
zhoukang created SPARK-26756:


 Summary: Support session conf for thriftserver
 Key: SPARK-26756
 URL: https://issues.apache.org/jira/browse/SPARK-26756
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: zhoukang


We can add support for session conf.like:

{code:java}
set spark.sql.xxx.xxx=xxx
{code}
which i think can be useful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly

2019-01-28 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26751:
-
Description: 
When we run in background and we get exception which is not HiveSQLException,
we may encounter memory leak since handleToOperation will not removed correctly.
The reason is below:
1. when calling operation.run we throw an exception which is not 
HiveSQLException
2. then opHandleSet will not add the opHandle, and 
operationManager.closeOperation(opHandle); will not be called
{code:java}
 private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync) throws HiveSQLException {
this.acquire(true);
OperationManager operationManager = this.getOperationManager();
ExecuteStatementOperation operation = 
operationManager.newExecuteStatementOperation(this.getSession(), statement, 
confOverlay, runAsync);
OperationHandle opHandle = operation.getHandle();

OperationHandle e;
try {
operation.run();
this.opHandleSet.add(opHandle);
e = opHandle;
} catch (HiveSQLException var11) {
operationManager.closeOperation(opHandle);
throw var11;
} finally {
this.release(true);
}

return e;
}


  try {
// This submit blocks if no background threads are available to run 
this operation
val backgroundHandle =
  
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
  } catch {
case rejected: RejectedExecutionException =>
  setState(OperationState.ERROR)
  throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
  logError(s"Error executing query in background", e)
  setState(OperationState.ERROR)
  throw e
  }
}
{code}
3. when we close the session we will also call 
operationManager.closeOperation(opHandle),since we did not add this opHandle 
into the opHandleSet.
{code}
public void close() throws HiveSQLException {
try {
this.acquire(true);
Iterator ioe = this.opHandleSet.iterator();

while(ioe.hasNext()) {
OperationHandle opHandle = (OperationHandle)ioe.next();
this.operationManager.closeOperation(opHandle);
}

this.opHandleSet.clear();
this.cleanupSessionLogDir();
this.cleanupPipeoutFile();
HiveHistory ioe1 = this.sessionState.getHiveHistory();
if(null != ioe1) {
ioe1.closeStream();
}

try {
this.sessionState.close();
} finally {
this.sessionState = null;
}
} catch (IOException var17) {
throw new HiveSQLException("Failure to close", var17);
} finally {
if(this.sessionState != null) {
try {
this.sessionState.close();
} catch (Throwable var15) {
LOG.warn("Error closing session", var15);
}

this.sessionState = null;
}

this.release(true);
}

}
{code}
4. however, the opHandle will added into handleToOperation for each statement
{code}
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
  val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

  override def newExecuteStatementOperation(
  parentSession: HiveSession,
  statement: String,
  confOverlay: JMap[String, String],
  async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: 
${parentSession.getSessionHandle} has not been" +
  s" initialized or had already closed.")
val conf = sqlContext.sessionState.conf
val hiveSessionState = parentSession.getSessionState
setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
setConfMap(conf, hiveSessionState.getHiveVariables)
val runInBackground = async && 
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession, 
statement, confOverlay,
  runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
  s"runInBackground=$runInBackground")
operation
  }
{code}

Below is an example which has memory leak:


  was:
When we run in 

[jira] [Updated] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly

2019-01-28 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26751:
-
Description: 
When we run in background and we get exception which is not HiveSQLException,
we may encounter memory leak since handleToOperation will not removed correctly.
The reason is below:
1. when calling operation.run we throw an exception which is not 
HiveSQLException
2. then opHandleSet will not add the opHandle, and 
operationManager.closeOperation(opHandle); will not be called
{code:java}
 private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync) throws HiveSQLException {
this.acquire(true);
OperationManager operationManager = this.getOperationManager();
ExecuteStatementOperation operation = 
operationManager.newExecuteStatementOperation(this.getSession(), statement, 
confOverlay, runAsync);
OperationHandle opHandle = operation.getHandle();

OperationHandle e;
try {
operation.run();
this.opHandleSet.add(opHandle);
e = opHandle;
} catch (HiveSQLException var11) {
operationManager.closeOperation(opHandle);
throw var11;
} finally {
this.release(true);
}

return e;
}


  try {
// This submit blocks if no background threads are available to run 
this operation
val backgroundHandle =
  
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
  } catch {
case rejected: RejectedExecutionException =>
  setState(OperationState.ERROR)
  throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
  logError(s"Error executing query in background", e)
  setState(OperationState.ERROR)
  throw e
  }
}
{code}
3. when we close the session we will also call 
operationManager.closeOperation(opHandle),since we did not add this opHandle 
into the opHandleSet.
{code}
public void close() throws HiveSQLException {
try {
this.acquire(true);
Iterator ioe = this.opHandleSet.iterator();

while(ioe.hasNext()) {
OperationHandle opHandle = (OperationHandle)ioe.next();
this.operationManager.closeOperation(opHandle);
}

this.opHandleSet.clear();
this.cleanupSessionLogDir();
this.cleanupPipeoutFile();
HiveHistory ioe1 = this.sessionState.getHiveHistory();
if(null != ioe1) {
ioe1.closeStream();
}

try {
this.sessionState.close();
} finally {
this.sessionState = null;
}
} catch (IOException var17) {
throw new HiveSQLException("Failure to close", var17);
} finally {
if(this.sessionState != null) {
try {
this.sessionState.close();
} catch (Throwable var15) {
LOG.warn("Error closing session", var15);
}

this.sessionState = null;
}

this.release(true);
}

}
{code}
4. however, the opHandle will added into handleToOperation for each statement
{code}
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
  val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

  override def newExecuteStatementOperation(
  parentSession: HiveSession,
  statement: String,
  confOverlay: JMap[String, String],
  async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: 
${parentSession.getSessionHandle} has not been" +
  s" initialized or had already closed.")
val conf = sqlContext.sessionState.conf
val hiveSessionState = parentSession.getSessionState
setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
setConfMap(conf, hiveSessionState.getHiveVariables)
val runInBackground = async && 
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession, 
statement, confOverlay,
  runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
  s"runInBackground=$runInBackground")
operation
  }
{code}

Below is an example which has memory leak:
 !26751.png! 

  was:

[jira] [Updated] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly

2019-01-28 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26751:
-
Attachment: 26751.png

> HiveSessionImpl might have memory leak since Operation do not close properly
> 
>
> Key: SPARK-26751
> URL: https://issues.apache.org/jira/browse/SPARK-26751
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26751.png
>
>
> When we run in background and we get exception which is not HiveSQLException,
> we may encounter memory leak since handleToOperation will not removed 
> correctly.
> The reason is below:
> 1. when calling operation.run we throw an exception which is not 
> HiveSQLException
> 2. then opHandleSet will not add the opHandle, and 
> operationManager.closeOperation(opHandle); will not be called
> {code:java}
>  private OperationHandle executeStatementInternal(String statement, 
> Map confOverlay, boolean runAsync) throws HiveSQLException {
> this.acquire(true);
> OperationManager operationManager = this.getOperationManager();
> ExecuteStatementOperation operation = 
> operationManager.newExecuteStatementOperation(this.getSession(), statement, 
> confOverlay, runAsync);
> OperationHandle opHandle = operation.getHandle();
> OperationHandle e;
> try {
> operation.run();
> this.opHandleSet.add(opHandle);
> e = opHandle;
> } catch (HiveSQLException var11) {
> operationManager.closeOperation(opHandle);
> throw var11;
> } finally {
> this.release(true);
> }
> return e;
> }
>   try {
> // This submit blocks if no background threads are available to run 
> this operation
> val backgroundHandle =
>   
> parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
> setBackgroundHandle(backgroundHandle)
>   } catch {
> case rejected: RejectedExecutionException =>
>   setState(OperationState.ERROR)
>   throw new HiveSQLException("The background threadpool cannot 
> accept" +
> " new task for execution, please retry the operation", rejected)
> case NonFatal(e) =>
>   logError(s"Error executing query in background", e)
>   setState(OperationState.ERROR)
>   throw e
>   }
> }
> {code}
> 3. when we close the session we will also call 
> operationManager.closeOperation(opHandle),since we did not add this opHandle 
> into the opHandleSet.
> {code}
> public void close() throws HiveSQLException {
> try {
> this.acquire(true);
> Iterator ioe = this.opHandleSet.iterator();
> while(ioe.hasNext()) {
> OperationHandle opHandle = (OperationHandle)ioe.next();
> this.operationManager.closeOperation(opHandle);
> }
> this.opHandleSet.clear();
> this.cleanupSessionLogDir();
> this.cleanupPipeoutFile();
> HiveHistory ioe1 = this.sessionState.getHiveHistory();
> if(null != ioe1) {
> ioe1.closeStream();
> }
> try {
> this.sessionState.close();
> } finally {
> this.sessionState = null;
> }
> } catch (IOException var17) {
> throw new HiveSQLException("Failure to close", var17);
> } finally {
> if(this.sessionState != null) {
> try {
> this.sessionState.close();
> } catch (Throwable var15) {
> LOG.warn("Error closing session", var15);
> }
> this.sessionState = null;
> }
> this.release(true);
> }
> }
> {code}
> 4. however, the opHandle will added into handleToOperation for each statement
> {code}
> val handleToOperation = ReflectionUtils
> .getSuperField[JMap[OperationHandle, Operation]](this, 
> "handleToOperation")
>   val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
>   val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
>   override def newExecuteStatementOperation(
>   parentSession: HiveSession,
>   statement: String,
>   confOverlay: JMap[String, String],
>   async: Boolean): ExecuteStatementOperation = synchronized {
> val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
> require(sqlContext != null, s"Session handle: 
> ${parentSession.getSessionHandle} has not been" +
>   s" initialized or had already closed.")
> val conf = sqlContext.sessionState.conf
> 

[jira] [Created] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly

2019-01-28 Thread zhoukang (JIRA)
zhoukang created SPARK-26751:


 Summary: HiveSessionImpl might have memory leak since Operation do 
not close properly
 Key: SPARK-26751
 URL: https://issues.apache.org/jira/browse/SPARK-26751
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: zhoukang


When we run in background and we get exception which is not HiveSQLException,
we may encounter memory leak since handleToOperation will not removed correctly.
The reason is below:
1. when calling operation.run we throw an exception which is not 
HiveSQLException
2. then opHandleSet will not add the opHandle, and 
operationManager.closeOperation(opHandle); will not be called
{code:java}
 private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync) throws HiveSQLException {
this.acquire(true);
OperationManager operationManager = this.getOperationManager();
ExecuteStatementOperation operation = 
operationManager.newExecuteStatementOperation(this.getSession(), statement, 
confOverlay, runAsync);
OperationHandle opHandle = operation.getHandle();

OperationHandle e;
try {
operation.run();
this.opHandleSet.add(opHandle);
e = opHandle;
} catch (HiveSQLException var11) {
operationManager.closeOperation(opHandle);
throw var11;
} finally {
this.release(true);
}

return e;
}


  try {
// This submit blocks if no background threads are available to run 
this operation
val backgroundHandle =
  
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
  } catch {
case rejected: RejectedExecutionException =>
  setState(OperationState.ERROR)
  throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
  logError(s"Error executing query in background", e)
  setState(OperationState.ERROR)
  throw e
  }
}
{code}
3. when we close the session we will also call 
operationManager.closeOperation(opHandle),since we did not add this opHandle 
into the opHandleSet.
{code}
public void close() throws HiveSQLException {
try {
this.acquire(true);
Iterator ioe = this.opHandleSet.iterator();

while(ioe.hasNext()) {
OperationHandle opHandle = (OperationHandle)ioe.next();
this.operationManager.closeOperation(opHandle);
}

this.opHandleSet.clear();
this.cleanupSessionLogDir();
this.cleanupPipeoutFile();
HiveHistory ioe1 = this.sessionState.getHiveHistory();
if(null != ioe1) {
ioe1.closeStream();
}

try {
this.sessionState.close();
} finally {
this.sessionState = null;
}
} catch (IOException var17) {
throw new HiveSQLException("Failure to close", var17);
} finally {
if(this.sessionState != null) {
try {
this.sessionState.close();
} catch (Throwable var15) {
LOG.warn("Error closing session", var15);
}

this.sessionState = null;
}

this.release(true);
}

}
{code}
4. however, the opHandle will added into handleToOperation for each statement
{code}
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
  val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

  override def newExecuteStatementOperation(
  parentSession: HiveSession,
  statement: String,
  confOverlay: JMap[String, String],
  async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: 
${parentSession.getSessionHandle} has not been" +
  s" initialized or had already closed.")
val conf = sqlContext.sessionState.conf
val hiveSessionState = parentSession.getSessionState
setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
setConfMap(conf, hiveSessionState.getHiveVariables)
val runInBackground = async && 
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession, 
statement, confOverlay,
  runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)

[jira] [Created] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it

2019-01-23 Thread zhoukang (JIRA)
zhoukang created SPARK-26703:


 Summary: Hive record writer will always depends on parquet-1.6 
writer should fix it 
 Key: SPARK-26703
 URL: https://issues.apache.org/jira/browse/SPARK-26703
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0, 2.3.2
Reporter: zhoukang


Currently, when we are using insert into hive table related command.
The parquet file generated will always be version 1.6,reason is below:
1. we rely on hive-exec HiveFileFormatUtils to get recordWriter
{code:java}
private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
jobConf,
tableDesc,
serializer.getSerializedClass,
fileSinkConf,
new Path(path),
Reporter.NULL)
{code}
2. we will call 
{code:java}
public static RecordWriter getHiveRecordWriter(JobConf jc,
  TableDesc tableInfo, Class outputClass,
  FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, 
tableInfo);
try {
  boolean isCompressed = conf.getCompressed();
  JobConf jc_output = jc;
  if (isCompressed) {
jc_output = new JobConf(jc);
String codecStr = conf.getCompressCodec();
if (codecStr != null && !codecStr.trim().equals("")) {
  Class codec = 
  (Class) JavaUtils.loadClass(codecStr);
  FileOutputFormat.setOutputCompressorClass(jc_output, codec);
}
String type = conf.getCompressType();
if (type != null && !type.trim().equals("")) {
  CompressionType style = CompressionType.valueOf(type);
  SequenceFileOutputFormat.setOutputCompressionType(jc, style);
}
  }
  return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
  isCompressed, tableInfo.getProperties(), outPath, reporter);
} catch (Exception e) {
  throw new HiveException(e);
}
  }

  public static RecordWriter getRecordWriter(JobConf jc,
  OutputFormat outputFormat,
  Class valueClass, boolean isCompressed,
  Properties tableProp, Path outPath, Reporter reporter
  ) throws IOException, HiveException {
if (!(outputFormat instanceof HiveOutputFormat)) {
  outputFormat = new HivePassThroughOutputFormat(outputFormat);
}
return ((HiveOutputFormat)outputFormat).getHiveRecordWriter(
jc, outPath, valueClass, isCompressed, tableProp, reporter);
  }
{code}

3. then in MapredParquetOutPutFormat

{code:java}
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getHiveRecordWriter(
  final JobConf jobConf,
  final Path finalOutPath,
  final Class valueClass,
  final boolean isCompressed,
  final Properties tableProperties,
  final Progressable progress) throws IOException {

LOG.info("creating new record writer..." + this);

final String columnNameProperty = 
tableProperties.getProperty(IOConstants.COLUMNS);
final String columnTypeProperty = 
tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
List columnNames;
List columnTypes;

if (columnNameProperty.length() == 0) {
  columnNames = new ArrayList();
} else {
  columnNames = Arrays.asList(columnNameProperty.split(","));
}

if (columnTypeProperty.length() == 0) {
  columnTypes = new ArrayList();
} else {
  columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}

DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, 
columnTypes), jobConf);

return getParquerRecordWriterWrapper(realOutputFormat, jobConf, 
finalOutPath.toString(),
progress,tableProperties);
  }
{code}
4. then call 

{code:java}
public ParquetRecordWriterWrapper(
  final OutputFormat realOutputFormat,
  final JobConf jobConf,
  final String name,
  final Progressable progress, Properties tableProperties) throws
  IOException {
try {
  // create a TaskInputOutputContext
  TaskAttemptID taskAttemptID = 
TaskAttemptID.forName(jobConf.get("mapred.task.id"));
  if (taskAttemptID == null) {
taskAttemptID = new TaskAttemptID();
  }
  taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);

  LOG.info("initialize serde with table properties.");
  initializeSerProperties(taskContext, tableProperties);

  LOG.info("creating real writer to write at " + name);

  realWriter =
  ((ParquetOutputFormat) 
realOutputFormat).getRecordWriter(taskContext, new Path(name));

  LOG.info("real writer: " + realWriter);
} catch (final InterruptedException e) {
  throw new IOException(e);
}
  }
{code}

And the ((ParquetOutputFormat) is verison 1.6.
And all file generated will miss some useful Statistics like min max of string.
We should fix this issue to use new features of  parquet




--
This message 

[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-01-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26601:
-
Summary: Make broadcast-exchange thread pool keepalivetime and 
maxThreadNumber configurable  (was: Make broadcast-exchange thread pool 
keepalivetime configurable)

> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 选区_001.png, 选区_002 (1).png, 选区_002.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM.
> Below is an example:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-01-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26601:
-
Attachment: 选区_002 (1).png
选区_002.png
选区_001.png

> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 选区_001.png, 选区_002 (1).png, 选区_002.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM.
> Below is an example:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-01-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26601:
-
Description: 
Currently,thread number of broadcast-exchange thread pool is fixed and 
keepAliveSeconds is also fixed as 60s.

{code:java}
object BroadcastExchangeExec {
  private[execution] val executionContext = 
ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}

 /**
   * Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
   * are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
   */
  def newDaemonCachedThreadPool(
  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
  keepAliveSeconds,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue[Runnable],
  threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
  }
{code}
But some times, if the Thead object do not GC quickly it may caused 
server(driver) OOM. In such case,we need to make this thread pool configurable.
Below is an example:
 !26601-occupy.png! 
 !26601-largeobject.png! 
 !26601-path2gcroot.png! 

  was:
Currently,thread number of broadcast-exchange thread pool is fixed and 
keepAliveSeconds is also fixed as 60s.

{code:java}
object BroadcastExchangeExec {
  private[execution] val executionContext = 
ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}

 /**
   * Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
   * are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
   */
  def newDaemonCachedThreadPool(
  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
  keepAliveSeconds,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue[Runnable],
  threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
  }
{code}
But some times, if the Thead object do not GC quickly it may caused 
server(driver) OOM.
Below is an example:
 !26601-occupy.png! 
 !26601-largeobject.png! 
 !26601-path2gcroot.png! 


> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26601-largeobject.png, 26601-occupy.png, 
> 26601-path2gcroot.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM. In such case,we need to make this thread pool 
> configurable.
> Below is an example:
>  !26601-occupy.png! 
>  !26601-largeobject.png! 
>  !26601-path2gcroot.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To 

[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-01-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26601:
-
Description: 
Currently,thread number of broadcast-exchange thread pool is fixed and 
keepAliveSeconds is also fixed as 60s.

{code:java}
object BroadcastExchangeExec {
  private[execution] val executionContext = 
ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}

 /**
   * Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
   * are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
   */
  def newDaemonCachedThreadPool(
  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
  keepAliveSeconds,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue[Runnable],
  threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
  }
{code}
But some times, if the Thead object do not GC quickly it may caused 
server(driver) OOM.
Below is an example:
 !26601-occupy.png! 
 !26601-largeobject.png! 
 !26601-path2gcroot.png! 

  was:
Currently,thread number of broadcast-exchange thread pool is fixed and 
keepAliveSeconds is also fixed as 60s.

{code:java}
object BroadcastExchangeExec {
  private[execution] val executionContext = 
ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}

 /**
   * Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
   * are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
   */
  def newDaemonCachedThreadPool(
  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
  keepAliveSeconds,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue[Runnable],
  threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
  }
{code}
But some times, if the Thead object do not GC quickly it may caused 
server(driver) OOM.
Below is an example:



> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26601-largeobject.png, 26601-occupy.png, 
> 26601-path2gcroot.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM.
> Below is an example:
>  !26601-occupy.png! 
>  !26601-largeobject.png! 
>  !26601-path2gcroot.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-01-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26601:
-
Attachment: (was: 选区_002.png)

> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26601-largeobject.png, 26601-occupy.png, 
> 26601-path2gcroot.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM.
> Below is an example:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-01-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26601:
-
Attachment: 26601-path2gcroot.png
26601-occupy.png
26601-largeobject.png

> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26601-largeobject.png, 26601-occupy.png, 
> 26601-path2gcroot.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM.
> Below is an example:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-01-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26601:
-
Attachment: (was: 选区_001.png)

> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26601-largeobject.png, 26601-occupy.png, 
> 26601-path2gcroot.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM.
> Below is an example:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-01-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26601:
-
Attachment: (was: 选区_002 (1).png)

> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Priority: Major
> Attachments: 26601-largeobject.png, 26601-occupy.png, 
> 26601-path2gcroot.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM.
> Below is an example:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime configurable

2019-01-11 Thread zhoukang (JIRA)
zhoukang created SPARK-26601:


 Summary: Make broadcast-exchange thread pool keepalivetime 
configurable
 Key: SPARK-26601
 URL: https://issues.apache.org/jira/browse/SPARK-26601
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: zhoukang


Currently,thread number of broadcast-exchange thread pool is fixed and 
keepAliveSeconds is also fixed as 60s.

{code:java}
object BroadcastExchangeExec {
  private[execution] val executionContext = 
ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}

 /**
   * Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
   * are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
   */
  def newDaemonCachedThreadPool(
  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
  keepAliveSeconds,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue[Runnable],
  threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
  }
{code}
But some times, if the Thead object do not GC quickly it may caused 
server(driver) OOM.
Below is an example:




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26568) Too many partitions may cause thriftServer frequently Full GC

2019-01-08 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-26568:
-
Description: 
The reason is that:
first we have a table with many partitions(may be several hundred);second, we 
have some concurrent queries.Then the long-running thriftServer may encounter 
OOM issue.

Here is a case:
call stack of OOM thread:
{code:java}
pool-34-thread-10
  at 
org.apache.hadoop.hive.metastore.api.StorageDescriptor.(Lorg/apache/hadoop/hive/metastore/api/StorageDescriptor;)V
 (StorageDescriptor.java:240)
  at 
org.apache.hadoop.hive.metastore.api.Partition.(Lorg/apache/hadoop/hive/metastore/api/Partition;)V
 (Partition.java:216)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopy(Lorg/apache/hadoop/hive/metastore/api/Partition;)Lorg/apache/hadoop/hive/metastore/api/Partition;
 (HiveMetaStoreClient.java:1343)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/Collection;Ljava/util/List;)Ljava/util/List;
 (HiveMetaStoreClient.java:1409)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/List;)Ljava/util/List;
 (HiveMetaStoreClient.java:1397)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List;
 (HiveMetaStoreClient.java:914)
  at 
sun.reflect.GeneratedMethodAccessor98.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (Unknown Source)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (DelegatingMethodAccessorImpl.java:43)
  at 
java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (Method.java:606)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(Ljava/lang/Object;Ljava/lang/reflect/Method;[Ljava/lang/Object;)Ljava/lang/Object;
 (RetryingMetaStoreClient.java:90)
  at 
com.sun.proxy.$Proxy30.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List;
 (Unknown Source)
  at 
org.apache.hadoop.hive.ql.metadata.Hive.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Table;Ljava/lang/String;)Ljava/util/List;
 (Hive.java:1967)
  at 
sun.reflect.GeneratedMethodAccessor97.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (Unknown Source)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (DelegatingMethodAccessorImpl.java:43)
  at 
java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (Method.java:606)
  at 
org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Hive;Lorg/apache/hadoop/hive/ql/metadata/Table;Lscala/collection/Seq;)Lscala/collection/Seq;
 (HiveShim.scala:602)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Lscala/collection/Seq;
 (HiveClientImpl.scala:608)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Ljava/lang/Object;
 (HiveClientImpl.scala:606)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply()Ljava/lang/Object;
 (HiveClientImpl.scala:321)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(Lscala/Function0;Lscala/runtime/IntRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)V
 (HiveClientImpl.scala:264)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(Lscala/Function0;)Ljava/lang/Object;
 (HiveClientImpl.scala:263)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(Lscala/Function0;)Ljava/lang/Object;
 (HiveClientImpl.scala:307)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(Lorg/apache/spark/sql/catalyst/catalog/CatalogTable;Lscala/collection/Seq;)Lscala/collection/Seq;
 (HiveClientImpl.scala:606)
  at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Lscala/collection/Seq;
 (HiveExternalCatalog.scala:1017)
  at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Ljava/lang/Object;
 (HiveExternalCatalog.scala:1000)
  at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(Lscala/Function0;)Ljava/lang/Object;
 (HiveExternalCatalog.scala:100)
  at 
org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Lscala/collection/Seq;)Lscala/collection/Seq;
 (HiveExternalCatalog.scala:1000)
  at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(Lorg/apache/spark/sql/catalyst/TableIdentifier;Lscala/collection/Seq;)Lscala/collection/Seq;
 (SessionCatalog.scala:803)
  at 

[jira] [Created] (SPARK-26568) Too many partitions may cause thriftServer frequently Full GC

2019-01-08 Thread zhoukang (JIRA)
zhoukang created SPARK-26568:


 Summary: Too many partitions may cause thriftServer frequently 
Full GC
 Key: SPARK-26568
 URL: https://issues.apache.org/jira/browse/SPARK-26568
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: zhoukang


The reason is that:
first we have a table with many partitions(may be several hundred);second, we 
have some concurrent queries.Then the long-running thriftServer may encounter 
OOM issue.
Here is a case:
call stack of OOM thread:
{code:java}
pool-34-thread-10
  at 
org.apache.hadoop.hive.metastore.api.StorageDescriptor.(Lorg/apache/hadoop/hive/metastore/api/StorageDescriptor;)V
 (StorageDescriptor.java:240)
  at 
org.apache.hadoop.hive.metastore.api.Partition.(Lorg/apache/hadoop/hive/metastore/api/Partition;)V
 (Partition.java:216)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopy(Lorg/apache/hadoop/hive/metastore/api/Partition;)Lorg/apache/hadoop/hive/metastore/api/Partition;
 (HiveMetaStoreClient.java:1343)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/Collection;Ljava/util/List;)Ljava/util/List;
 (HiveMetaStoreClient.java:1409)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/List;)Ljava/util/List;
 (HiveMetaStoreClient.java:1397)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List;
 (HiveMetaStoreClient.java:914)
  at 
sun.reflect.GeneratedMethodAccessor98.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (Unknown Source)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (DelegatingMethodAccessorImpl.java:43)
  at 
java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (Method.java:606)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(Ljava/lang/Object;Ljava/lang/reflect/Method;[Ljava/lang/Object;)Ljava/lang/Object;
 (RetryingMetaStoreClient.java:90)
  at 
com.sun.proxy.$Proxy30.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List;
 (Unknown Source)
  at 
org.apache.hadoop.hive.ql.metadata.Hive.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Table;Ljava/lang/String;)Ljava/util/List;
 (Hive.java:1967)
  at 
sun.reflect.GeneratedMethodAccessor97.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (Unknown Source)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (DelegatingMethodAccessorImpl.java:43)
  at 
java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;
 (Method.java:606)
  at 
org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Hive;Lorg/apache/hadoop/hive/ql/metadata/Table;Lscala/collection/Seq;)Lscala/collection/Seq;
 (HiveShim.scala:602)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Lscala/collection/Seq;
 (HiveClientImpl.scala:608)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Ljava/lang/Object;
 (HiveClientImpl.scala:606)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply()Ljava/lang/Object;
 (HiveClientImpl.scala:321)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(Lscala/Function0;Lscala/runtime/IntRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)V
 (HiveClientImpl.scala:264)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(Lscala/Function0;)Ljava/lang/Object;
 (HiveClientImpl.scala:263)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(Lscala/Function0;)Ljava/lang/Object;
 (HiveClientImpl.scala:307)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(Lorg/apache/spark/sql/catalyst/catalog/CatalogTable;Lscala/collection/Seq;)Lscala/collection/Seq;
 (HiveClientImpl.scala:606)
  at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Lscala/collection/Seq;
 (HiveExternalCatalog.scala:1017)
  at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Ljava/lang/Object;
 (HiveExternalCatalog.scala:1000)
  at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(Lscala/Function0;)Ljava/lang/Object;
 (HiveExternalCatalog.scala:100)
  at 
org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Lscala/collection/Seq;)Lscala/collection/Seq;
 (HiveExternalCatalog.scala:1000)
  at 

[jira] [Created] (SPARK-26533) Support query auto cancel on thriftserver

2019-01-03 Thread zhoukang (JIRA)
zhoukang created SPARK-26533:


 Summary: Support query auto cancel on thriftserver
 Key: SPARK-26533
 URL: https://issues.apache.org/jira/browse/SPARK-26533
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: zhoukang


Support query auto cancelling when running too long on thriftserver.
For some cases,we use thriftserver as long-running applications.
Some times we want all the query need not to run more than given time.
In these cases,we can enable auto cancel for time-consumed query.Which can let 
us release resources for other queries to run.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26514) Introduce a new conf to improve the task parallelism

2018-12-31 Thread zhoukang (JIRA)
zhoukang created SPARK-26514:


 Summary: Introduce a new conf to improve the task parallelism
 Key: SPARK-26514
 URL: https://issues.apache.org/jira/browse/SPARK-26514
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
Reporter: zhoukang


Currently, we have a conf below.
{code:java}
private[spark] val CPUS_PER_TASK = 
ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1)
{code}
For some applications which are not cpu-intensive,we may want to let one cpu to 
run more than one task.
Like:
{code:java}
private[spark] val TASKS_PER_CPU = 
ConfigBuilder("spark.cpu.tasks").intConf.createWithDefault(1)
{code}
Which can improve performance for some applications and can also improve 
resource utilization




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24687) When NoClassDefError thrown during task serialization will cause job hang

2018-06-28 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24687:
-
Description: 
When below exception thrown:

{code:java}
Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: 
Lcom/xxx/data/recommend/aggregator/queue/QueueName;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
at java.lang.Class.getDeclaredField(Class.java:1946)
at 
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
{code}
Stage will always hang.Not abort.
 !hanging-960.png! 
It is because NoClassDefError will no be catch up below.
{code}
var taskBinary: Broadcast[Array[Byte]] = null
try {
  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  // For ResultTask, serialize and broadcast (rdd, func).
  val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
  JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
  JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
  }

  taskBinary = 

[jira] [Created] (SPARK-24687) When NoClassDefError thrown during task serialization will cause job hang

2018-06-28 Thread zhoukang (JIRA)
zhoukang created SPARK-24687:


 Summary: When NoClassDefError thrown during task serialization 
will cause job hang
 Key: SPARK-24687
 URL: https://issues.apache.org/jira/browse/SPARK-24687
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.1.1, 2.1.0
Reporter: zhoukang
 Attachments: hanging-960.png

When below exception thrown:

{code:java}
Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: 
Lcom/xxx/data/recommend/aggregator/queue/QueueName;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
at java.lang.Class.getDeclaredField(Class.java:1946)
at 
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
{code}
Stage will always hang.Not abort.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24687) When NoClassDefError thrown during task serialization will cause job hang

2018-06-28 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24687:
-
Description: 
When below exception thrown:

{code:java}
Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: 
Lcom/xxx/data/recommend/aggregator/queue/QueueName;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
at java.lang.Class.getDeclaredField(Class.java:1946)
at 
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
{code}
Stage will always hang.Not abort.
 !hanging-960.png! 


  was:
When below exception thrown:

{code:java}
Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: 
Lcom/xxx/data/recommend/aggregator/queue/QueueName;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
at java.lang.Class.getDeclaredField(Class.java:1946)
at 
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at 

[jira] [Updated] (SPARK-24687) When NoClassDefError thrown during task serialization will cause job hang

2018-06-28 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24687:
-
Attachment: hanging-960.png

> When NoClassDefError thrown during task serialization will cause job hang
> -
>
> Key: SPARK-24687
> URL: https://issues.apache.org/jira/browse/SPARK-24687
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.1.1
>Reporter: zhoukang
>Priority: Major
> Attachments: hanging-960.png
>
>
> When below exception thrown:
> {code:java}
> Exception in thread "dag-scheduler-event-loop" 
> java.lang.NoClassDefFoundError: 
> Lcom/xxx/data/recommend/aggregator/queue/QueueName;
>   at java.lang.Class.getDeclaredFields0(Native Method)
>   at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
>   at java.lang.Class.getDeclaredField(Class.java:1946)
>   at 
> java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
>   at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
>   at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
>   at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
>   at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
>   at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> {code}
> Stage will always hang.Not abort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SPARK-24544) Print actual failure cause when look up function failed

2018-06-13 Thread zhoukang (JIRA)
zhoukang created SPARK-24544:


 Summary: Print actual failure cause when look up function failed
 Key: SPARK-24544
 URL: https://issues.apache.org/jira/browse/SPARK-24544
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.1, 2.1.0
Reporter: zhoukang


When we operate as below:
{code}
0: jdbc:hive2://xxx/> create  function funnel_analysis as 
'com.xxx.hive.extend.udf.UapFunnelAnalysis';
0: jdbc:hive2://xxx/> use mifi;
+-+--+
| Result  |
+-+--+
+-+--+
0: jdbc:hive2://xxx/> select funnel_analysis(1,",",1,'');
Error: org.apache.spark.sql.AnalysisException: Undefined function: 
'funnel_analysis'. This function is neither a registered temporary function nor 
a permanent function registered in the database 'xxx'.; line 1 pos 7 
(state=,code=0)
0: jdbc:hive2://xxx/> describe function funnel_analysis;
+---+--+
|   function_desc   |
+---+--+
| Function: mifi.funnel_analysis|
| Class: com.xxx.hive.extend.udf.UapFunnelAnalysis  |
| Usage: N/A.   |
+---+--+
{code}
We can see describe funtion will get right information,but when we actually use 
this funtion,we will get an undefined exception.
Which is really misleading,the real cause is below:
{code}
2018-06-13,17:15:16,725 INFO org.apache.spark.sql.hive.KuduHiveSessionCatalog: 
Error: org.apache.spark.sql.AnalysisException: No handler for Hive UDF 
'com.xiaomi.mifi.hive.extend.udf.UapFunnelAnalysis': 
java.lang.IllegalStateException: Should not be called directly;
at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF.initialize(GenericUDTF.java:72)
at 
org.apache.spark.sql.hive.HiveGenericUDTF.outputInspector$lzycompute(hiveUDFs.scala:204)
at 
org.apache.spark.sql.hive.HiveGenericUDTF.outputInspector(hiveUDFs.scala:204)
at 
org.apache.spark.sql.hive.HiveGenericUDTF.elementSchema$lzycompute(hiveUDFs.scala:212)
at 
org.apache.spark.sql.hive.HiveGenericUDTF.elementSchema(hiveUDFs.scala:212)
at 
org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$makeFunctionBuilder$1.apply(HiveSessionCatalog.scala:146)
at 
org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$makeFunctionBuilder$1.apply(HiveSessionCatalog.scala:122)
at 
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:94)
at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1060)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.org$apache$spark$sql$hive$HiveSessionCatalog$$super$lookupFunction(HiveSessionCatalog.scala:192)
at 
org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$6.apply(HiveSessionCatalog.scala:192)
at 
org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$6.apply(HiveSessionCatalog.scala:192)
at scala.util.Try$.apply(Try.scala:192)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction0(HiveSessionCatalog.scala:192)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction(HiveSessionCatalog.scala:177)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6$$anonfun$applyOrElse$39.apply(Analyzer.scala:900)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6$$anonfun$applyOrElse$39.apply(Analyzer.scala:900)
at 
org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6.applyOrElse(Analyzer.scala:899)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6.applyOrElse(Analyzer.scala:887)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
{code}

I think we should print this actual error for quick debugging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: 

[jira] [Updated] (SPARK-24515) No need to warning user when output commit coordination enabled

2018-06-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24515:
-
Description: 
 

No need to warning user when output commit coordination enabled
{code:java}
// When speculation is on and output committer class name contains "Direct", we 
should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
 val warningMessage =
 s"$outputCommitterClass may be an output committer that writes data directly 
to " +
 "the final location. Because speculation is enabled, this output committer may 
" +
 "cause data loss (see the case in SPARK-10063). If possible, please use an 
output " +
 "committer that does not have this behavior (e.g. FileOutputCommitter)."
 logWarning(warningMessage)
}

{code}

  was:
 
{code:java}
 {code}
No need to warning user when output commit coordination enabled

{code}

// When speculation is on and output committer class name contains "Direct", we 
should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
 val warningMessage =
 s"$outputCommitterClass may be an output committer that writes data directly 
to " +
 "the final location. Because speculation is enabled, this output committer may 
" +
 "cause data loss (see the case in SPARK-10063). If possible, please use an 
output " +
 "committer that does not have this behavior (e.g. FileOutputCommitter)."
 logWarning(warningMessage)
}

{code}
{code:java}
// code placeholder
{code}


> No need to warning user when output commit coordination enabled
> ---
>
> Key: SPARK-24515
> URL: https://issues.apache.org/jira/browse/SPARK-24515
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: zhoukang
>Priority: Major
>
>  
> No need to warning user when output commit coordination enabled
> {code:java}
> // When speculation is on and output committer class name contains "Direct", 
> we should warn
> // users that they may loss data if they are using a direct output committer.
> val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
> val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
> if (speculationEnabled && outputCommitterClass.contains("Direct")) {
>  val warningMessage =
>  s"$outputCommitterClass may be an output committer that writes data directly 
> to " +
>  "the final location. Because speculation is enabled, this output committer 
> may " +
>  "cause data loss (see the case in SPARK-10063). If possible, please use an 
> output " +
>  "committer that does not have this behavior (e.g. FileOutputCommitter)."
>  logWarning(warningMessage)
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24515) No need to warning user when output commit coordination enabled

2018-06-11 Thread zhoukang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24515:
-
Description: 
 
{code:java}
 {code}
No need to warning user when output commit coordination enabled

{code}

// When speculation is on and output committer class name contains "Direct", we 
should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
 val warningMessage =
 s"$outputCommitterClass may be an output committer that writes data directly 
to " +
 "the final location. Because speculation is enabled, this output committer may 
" +
 "cause data loss (see the case in SPARK-10063). If possible, please use an 
output " +
 "committer that does not have this behavior (e.g. FileOutputCommitter)."
 logWarning(warningMessage)
}

{code}
{code:java}
// code placeholder
{code}

  was:
{code:java}
// When speculation is on and output committer class name contains "Direct", we 
should warn // users that they may loss data if they are using a direct output 
committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", 
false) val outputCommitterClass = 
hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && 
outputCommitterClass.contains("Direct")) { val warningMessage = 
s"$outputCommitterClass may be an output committer that writes data directly to 
" + "the final location. Because speculation is enabled, this output committer 
may " + "cause data loss (see the case in SPARK-10063). If possible, please use 
an output " + "committer that does not have this behavior (e.g. 
FileOutputCommitter)." logWarning(warningMessage) }
 
{code}
No need to warning user when output commit coordination enabled


> No need to warning user when output commit coordination enabled
> ---
>
> Key: SPARK-24515
> URL: https://issues.apache.org/jira/browse/SPARK-24515
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: zhoukang
>Priority: Major
>
>  
> {code:java}
>  {code}
> No need to warning user when output commit coordination enabled
> {code}
> // When speculation is on and output committer class name contains "Direct", 
> we should warn
> // users that they may loss data if they are using a direct output committer.
> val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
> val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
> if (speculationEnabled && outputCommitterClass.contains("Direct")) {
>  val warningMessage =
>  s"$outputCommitterClass may be an output committer that writes data directly 
> to " +
>  "the final location. Because speculation is enabled, this output committer 
> may " +
>  "cause data loss (see the case in SPARK-10063). If possible, please use an 
> output " +
>  "committer that does not have this behavior (e.g. FileOutputCommitter)."
>  logWarning(warningMessage)
> }
> {code}
> {code:java}
> // code placeholder
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24515) No need to warning user when output commit coordination enabled

2018-06-11 Thread zhoukang (JIRA)
zhoukang created SPARK-24515:


 Summary: No need to warning user when output commit coordination 
enabled
 Key: SPARK-24515
 URL: https://issues.apache.org/jira/browse/SPARK-24515
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.1
Reporter: zhoukang


{code:java}
// When speculation is on and output committer class name contains "Direct", we 
should warn // users that they may loss data if they are using a direct output 
committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", 
false) val outputCommitterClass = 
hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && 
outputCommitterClass.contains("Direct")) { val warningMessage = 
s"$outputCommitterClass may be an output committer that writes data directly to 
" + "the final location. Because speculation is enabled, this output committer 
may " + "cause data loss (see the case in SPARK-10063). If possible, please use 
an output " + "committer that does not have this behavior (e.g. 
FileOutputCommitter)." logWarning(warningMessage) }
 
{code}
No need to warning user when output commit coordination enabled



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24440) When use constant as column we may get wrong answer versus impala

2018-05-31 Thread zhoukang (JIRA)
zhoukang created SPARK-24440:


 Summary: When use constant as column we may get wrong answer 
versus impala
 Key: SPARK-24440
 URL: https://issues.apache.org/jira/browse/SPARK-24440
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0, 2.1.0
Reporter: zhoukang


For query below:

{code:java}
select `date`, 100 as platform, count(distinct deviceid) as new_user from 
tv.clean_new_user where `date`=20180528 group by `date`, platform
{code}
We intended to group by 100 and get distinct deviceid number.
By spark sql,we get:
{code}
+---+---+---+--+
|   date| platform  | new_user  |
+---+---+---+--+
| 20180528  | 100   | 521   |
| 20180528  | 100   | 82|
| 20180528  | 100   | 3 |
| 20180528  | 100   | 2 |
| 20180528  | 100   | 7 |
| 20180528  | 100   | 870   |
| 20180528  | 100   | 3 |
| 20180528  | 100   | 8 |
| 20180528  | 100   | 3 |
| 20180528  | 100   | 2204  |
| 20180528  | 100   | 1123  |
| 20180528  | 100   | 1 |
| 20180528  | 100   | 54|
| 20180528  | 100   | 440   |
| 20180528  | 100   | 4 |
| 20180528  | 100   | 478   |
| 20180528  | 100   | 34|
| 20180528  | 100   | 195   |
| 20180528  | 100   | 17|
| 20180528  | 100   | 18|
| 20180528  | 100   | 2 |
| 20180528  | 100   | 2 |
| 20180528  | 100   | 84|
| 20180528  | 100   | 1616  |
| 20180528  | 100   | 15|
| 20180528  | 100   | 7 |
| 20180528  | 100   | 479   |
| 20180528  | 100   | 50|
| 20180528  | 100   | 376   |
| 20180528  | 100   | 21|
| 20180528  | 100   | 842   |
| 20180528  | 100   | 444   |
| 20180528  | 100   | 538   |
| 20180528  | 100   | 1 |
| 20180528  | 100   | 2 |
| 20180528  | 100   | 7 |
| 20180528  | 100   | 17|
| 20180528  | 100   | 133   |
| 20180528  | 100   | 7 |
| 20180528  | 100   | 415   |
| 20180528  | 100   | 2 |
| 20180528  | 100   | 318   |
| 20180528  | 100   | 5 |
| 20180528  | 100   | 1 |
| 20180528  | 100   | 2060  |
| 20180528  | 100   | 1217  |
| 20180528  | 100   | 2 |
| 20180528  | 100   | 60|
| 20180528  | 100   | 22|
| 20180528  | 100   | 4 |
+---+---+---+--+
{code}
Actually sum of the deviceid is below:
{code}
0: jdbc:hive2://xxx/> select sum(t1.new_user) from (select `date`, 100 as 
platform, count(distinct deviceid) as new_user from tv.clean_new_user where 
`date`=20180528 group by `date`, platform)t1; 
++--+
| sum(new_user)  |
++--+
| 14816  |
++--+
1 row selected (4.934 seconds)
{code}
And the real distinct deviceid value is below:
{code}
0: jdbc:hive2://xxx/> select 100 as platform, count(distinct deviceid) as 
new_user from tv.clean_new_user where `date`=20180528;
+---+---+--+
| platform  | new_user  |
+---+---+--+
| 100   | 14773 |
+---+---+--+
1 row selected (2.846 seconds)
{code}

In impala,with the first query we can get result below:
{code}
[xxx] > select `date`, 100 as platform, count(distinct deviceid) as new_user 
from tv.clean_new_user where `date`=20180528 group by `date`, platform;Query: 
select `date`, 100 as platform, count(distinct deviceid) as new_user from 
tv.clean_new_user where `date`=20180528 group by `date`, platform
+--+--+--+
| date | platform | new_user |
+--+--+--+
| 20180528 | 100  | 14773|
+--+--+--+
Fetched 1 row(s) in 1.00s
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24084) Add job group id for query through spark-sql

2018-04-25 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24084:
-
Description: For spark-sql we can add job group id for the same statement.  
(was: For thrift server we can add job group id for the same statement.Like 
below:)

> Add job group id for query through spark-sql
> 
>
> Key: SPARK-24084
> URL: https://issues.apache.org/jira/browse/SPARK-24084
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Major
>
> For spark-sql we can add job group id for the same statement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24084) Add job group id for query through spark-sql

2018-04-25 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24084:
-
Summary: Add job group id for query through spark-sql  (was: Add job group 
id for query through Thrift Server)

> Add job group id for query through spark-sql
> 
>
> Key: SPARK-24084
> URL: https://issues.apache.org/jira/browse/SPARK-24084
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Major
>
> For thrift server we can add job group id for the same statement.Like below:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24084) Add job group id for query through Thrift Server

2018-04-25 Thread zhoukang (JIRA)
zhoukang created SPARK-24084:


 Summary: Add job group id for query through Thrift Server
 Key: SPARK-24084
 URL: https://issues.apache.org/jira/browse/SPARK-24084
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: zhoukang


For thrift server we can add job group id for the same statement.Like below:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24083) Diagnostics message for uncaught exceptions should include the stacktrace

2018-04-25 Thread zhoukang (JIRA)
zhoukang created SPARK-24083:


 Summary: Diagnostics message for uncaught exceptions should 
include the stacktrace
 Key: SPARK-24083
 URL: https://issues.apache.org/jira/browse/SPARK-24083
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 2.3.0
Reporter: zhoukang


Like [SPARK-23296|https://issues.apache.org/jira/browse/SPARK-23296].
For uncaught exceptions, we should also print the stacktrace.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24059) When blacklist disable always hash to a bad local directory may cause job failure

2018-04-23 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449254#comment-16449254
 ] 

zhoukang commented on SPARK-24059:
--

When we set multiple local dir on different disk.
Retry will make sense.
[~srowen]

> When blacklist disable always hash to a bad local directory may cause job 
> failure
> -
>
> Key: SPARK-24059
> URL: https://issues.apache.org/jira/browse/SPARK-24059
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Major
>
> When blacklist disable , if we always hashed temp shuffle to a bad local 
> directory on the same executor will cause job failure.
> Like below:
> {code:java}
> java.io.FileNotFoundException: 
> /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/27/temp_shuffle_159e6886-b76f-4d96-9600-aee62ada0fa9
>  (Read-only file system)
> java.io.FileNotFoundException: 
> /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/06/temp_shuffle_ba7f0a29-8e02-4ffa-94f7-01f72d214821
>  (Read-only file system)
> java.io.FileNotFoundException: 
> /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/32/temp_shuffle_7030256c-fc24-4d45-a901-be23c2c3fbd6
>  (Read-only file system)
> java.io.FileNotFoundException: 
> /home/work/hdd8/yarn/zjyprc-hadoop/nodemanager/usercache/h_message_push/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/14/temp_shuffle_65816622-6217-43b9-bc9e-e2f67dc9a9de
>  (Read-only file system)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24059) When blacklist disable always hash to a bad local directory may cause job failure

2018-04-23 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449175#comment-16449175
 ] 

zhoukang commented on SPARK-24059:
--

cc [~vanzin] [~srowen]
If we add retry make sense?
I will work on this if this fix make sense!
Thanks

> When blacklist disable always hash to a bad local directory may cause job 
> failure
> -
>
> Key: SPARK-24059
> URL: https://issues.apache.org/jira/browse/SPARK-24059
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Major
>
> When blacklist disable , if we always hashed temp shuffle to a bad local 
> directory on the same executor will cause job failure.
> Like below:
> {code:java}
> java.io.FileNotFoundException: 
> /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/27/temp_shuffle_159e6886-b76f-4d96-9600-aee62ada0fa9
>  (Read-only file system)
> java.io.FileNotFoundException: 
> /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/06/temp_shuffle_ba7f0a29-8e02-4ffa-94f7-01f72d214821
>  (Read-only file system)
> java.io.FileNotFoundException: 
> /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/32/temp_shuffle_7030256c-fc24-4d45-a901-be23c2c3fbd6
>  (Read-only file system)
> java.io.FileNotFoundException: 
> /home/work/hdd8/yarn/zjyprc-hadoop/nodemanager/usercache/h_message_push/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/14/temp_shuffle_65816622-6217-43b9-bc9e-e2f67dc9a9de
>  (Read-only file system)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24059) When blacklist disable always hash to a bad local directory may cause job failure

2018-04-23 Thread zhoukang (JIRA)
zhoukang created SPARK-24059:


 Summary: When blacklist disable always hash to a bad local 
directory may cause job failure
 Key: SPARK-24059
 URL: https://issues.apache.org/jira/browse/SPARK-24059
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: zhoukang


When blacklist disable , if we always hashed temp shuffle to a bad local 
directory on the same executor will cause job failure.
Like below:

{code:java}
java.io.FileNotFoundException: 
/home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/27/temp_shuffle_159e6886-b76f-4d96-9600-aee62ada0fa9
 (Read-only file system)

java.io.FileNotFoundException: 
/home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/06/temp_shuffle_ba7f0a29-8e02-4ffa-94f7-01f72d214821
 (Read-only file system)

java.io.FileNotFoundException: 
/home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/32/temp_shuffle_7030256c-fc24-4d45-a901-be23c2c3fbd6
 (Read-only file system)

java.io.FileNotFoundException: 
/home/work/hdd8/yarn/zjyprc-hadoop/nodemanager/usercache/h_message_push/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/14/temp_shuffle_65816622-6217-43b9-bc9e-e2f67dc9a9de
 (Read-only file system)
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24053) Support add subdirectory named as user name on staging directory

2018-04-23 Thread zhoukang (JIRA)
zhoukang created SPARK-24053:


 Summary:  Support add subdirectory named as user name on staging 
directory
 Key: SPARK-24053
 URL: https://issues.apache.org/jira/browse/SPARK-24053
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: zhoukang


When we have multiple users on the same cluster,we can support add subdirectory 
which named as login user



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24052) Support spark version showing on environment page

2018-04-23 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447917#comment-16447917
 ] 

zhoukang commented on SPARK-24052:
--

[~jiangxb] Cloud you help review this?Thanks too much!

> Support spark version showing on environment page
> -
>
> Key: SPARK-24052
> URL: https://issues.apache.org/jira/browse/SPARK-24052
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Major
> Attachments: environment-page.png
>
>
> Since we may have multiple version in production cluster,we can showing some 
> information on environment page like below:
>  !environment-page.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24052) Support spark version showing on environment page

2018-04-23 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24052:
-
Description: 
Since we may have multiple version in production cluster,we can showing some 
information on environment page like below:
 !environment-page.png! 

> Support spark version showing on environment page
> -
>
> Key: SPARK-24052
> URL: https://issues.apache.org/jira/browse/SPARK-24052
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Major
> Attachments: environment-page.png
>
>
> Since we may have multiple version in production cluster,we can showing some 
> information on environment page like below:
>  !environment-page.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24052) Support spark version showing on environment page

2018-04-23 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-24052:
-
Attachment: environment-page.png

> Support spark version showing on environment page
> -
>
> Key: SPARK-24052
> URL: https://issues.apache.org/jira/browse/SPARK-24052
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Major
> Attachments: environment-page.png
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24052) Support spark version showing on environment page

2018-04-23 Thread zhoukang (JIRA)
zhoukang created SPARK-24052:


 Summary: Support spark version showing on environment page
 Key: SPARK-24052
 URL: https://issues.apache.org/jira/browse/SPARK-24052
 Project: Spark
  Issue Type: Improvement
  Components: Web UI
Affects Versions: 2.3.0
Reporter: zhoukang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23692) Print metadata of files when infer schema failed

2018-03-19 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang resolved SPARK-23692.
--
Resolution: Won't Fix

> Print metadata of files when infer schema failed
> 
>
> Key: SPARK-23692
> URL: https://issues.apache.org/jira/browse/SPARK-23692
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Minor
>
> A trivial modify.
> Currently, when we had no input files to infer schema,we will throw below 
> exception.
> For some users it may be misleading.If we can print files' metadata it will 
> be more clearer.
> {code:java}
> Caused by: org.apache.spark.sql.AnalysisException: Unable to infer schema for 
> Parquet. It must be specified manually.;
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
>  at scala.Option.getOrElse(Option.scala:121)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
>  at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
>  at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
>  at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425)
>  at 
> com.xiaomi.matrix.pipeline.jobspark.importer.MatrixAdEventDailyImportJob.(MatrixAdEventDailyImportJob.scala:18)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23665) Add adaptive algorithm to select query result collect method

2018-03-18 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang resolved SPARK-23665.
--
Resolution: Won't Fix

> Add adaptive algorithm to select query result collect method
> 
>
> Key: SPARK-23665
> URL: https://issues.apache.org/jira/browse/SPARK-23665
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1, 2.3.0
>Reporter: zhoukang
>Priority: Major
>
> Currently, we use configuration like 
> {code:java}
> spark.sql.thriftServer.incrementalCollect
> {code}
> to specify query result collect method.
> Actually,we can estimate the size of the result and select collect method 
> automatically. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23708) Comment of ShutdownHookManager.addShutdownHook is error

2018-03-16 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23708:
-
Issue Type: Improvement  (was: Bug)

> Comment of ShutdownHookManager.addShutdownHook is error
> ---
>
> Key: SPARK-23708
> URL: https://issues.apache.org/jira/browse/SPARK-23708
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Minor
>
> Comment below is not right!
> {code:java}
> /**
>* Adds a shutdown hook with the given priority. Hooks with lower priority 
> values run
>* first.
>*
>* @param hook The code to run during shutdown.
>* @return A handle that can be used to unregister the shutdown hook.
>*/
>   def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
> shutdownHooks.add(priority, hook)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23708) Comment of ShutdownHookManager.addShutdownHook is error

2018-03-16 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23708:
-
Issue Type: Bug  (was: Improvement)

> Comment of ShutdownHookManager.addShutdownHook is error
> ---
>
> Key: SPARK-23708
> URL: https://issues.apache.org/jira/browse/SPARK-23708
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: zhoukang
>Priority: Minor
>
> Comment below is not right!
> {code:java}
> /**
>* Adds a shutdown hook with the given priority. Hooks with lower priority 
> values run
>* first.
>*
>* @param hook The code to run during shutdown.
>* @return A handle that can be used to unregister the shutdown hook.
>*/
>   def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
> shutdownHooks.add(priority, hook)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23708) Comment of ShutdownHookManager.addShutdownHook is error

2018-03-16 Thread zhoukang (JIRA)
zhoukang created SPARK-23708:


 Summary: Comment of ShutdownHookManager.addShutdownHook is error
 Key: SPARK-23708
 URL: https://issues.apache.org/jira/browse/SPARK-23708
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: zhoukang


Comment below is not right!
{code:java}
/**
   * Adds a shutdown hook with the given priority. Hooks with lower priority 
values run
   * first.
   *
   * @param hook The code to run during shutdown.
   * @return A handle that can be used to unregister the shutdown hook.
   */
  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
  }
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23692) Print metadata of files when infer schema failed

2018-03-15 Thread zhoukang (JIRA)
zhoukang created SPARK-23692:


 Summary: Print metadata of files when infer schema failed
 Key: SPARK-23692
 URL: https://issues.apache.org/jira/browse/SPARK-23692
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: zhoukang


A trivial modify.
Currently, when we had no input files to infer schema,we will throw below 
exception.
For some users it may be misleading.If we can print files' metadata it will be 
more clearer.

{code:java}
Caused by: org.apache.spark.sql.AnalysisException: Unable to infer schema for 
Parquet. It must be specified manually.;
 at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
 at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
 at scala.Option.getOrElse(Option.scala:121)
 at 
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
 at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
 at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
 at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425)
 at 
com.xiaomi.matrix.pipeline.jobspark.importer.MatrixAdEventDailyImportJob.(MatrixAdEventDailyImportJob.scala:18)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23665) Add adaptive algorithm to select query result collect method

2018-03-14 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398706#comment-16398706
 ] 

zhoukang commented on SPARK-23665:
--

Add this logic so that users do not need to choose whether to set 
{code:java}
spark.sql.thriftServer.incrementalCollect
{code}
or not. Does this make sense?Thanks!

> Add adaptive algorithm to select query result collect method
> 
>
> Key: SPARK-23665
> URL: https://issues.apache.org/jira/browse/SPARK-23665
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1, 2.3.0
>Reporter: zhoukang
>Priority: Major
>
> Currently, we use configuration like 
> {code:java}
> spark.sql.thriftServer.incrementalCollect
> {code}
> to specify query result collect method.
> Actually,we can estimate the size of the result and select collect method 
> automatically. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23665) Add adaptive algorithm to select query result collect method

2018-03-13 Thread zhoukang (JIRA)
zhoukang created SPARK-23665:


 Summary: Add adaptive algorithm to select query result collect 
method
 Key: SPARK-23665
 URL: https://issues.apache.org/jira/browse/SPARK-23665
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0, 2.1.1
Reporter: zhoukang


Currently, we use configuration like 
{code:java}
spark.sql.thriftServer.incrementalCollect
{code}
to specify query result collect method.
Actually,we can estimate the size of the result and select collect method 
automatically. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23664) Add interface to collect query result through file iterator

2018-03-13 Thread zhoukang (JIRA)
zhoukang created SPARK-23664:


 Summary: Add interface to collect query result through file 
iterator
 Key: SPARK-23664
 URL: https://issues.apache.org/jira/browse/SPARK-23664
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0, 2.1.1
Reporter: zhoukang


Currently, we use spark sql through jdbc.
Result may cost much memory since we collect result and cached in memory for 
performance consideration.
However,we can also add an API to collect result through file iterator(like 
parquet file iterator),we can avoid OOM of thriftserver for big query.
Like below:

{code:java}
result.toLocalIteratorThroughFile.asScala
{code}
I will work on this if make sense!
And in our internal cluster we have used this API for about a year.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23513) java.io.IOException: Expected 12 fields, but got 5 for row :Spark submit error

2018-03-08 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391293#comment-16391293
 ] 

zhoukang commented on SPARK-23513:
--

Could you please post some more details?[~Fray]

> java.io.IOException: Expected 12 fields, but got 5 for row :Spark submit 
> error 
> ---
>
> Key: SPARK-23513
> URL: https://issues.apache.org/jira/browse/SPARK-23513
> Project: Spark
>  Issue Type: Bug
>  Components: EC2, Examples, Input/Output, Java API
>Affects Versions: 1.4.0, 2.2.0
>Reporter: Rawia 
>Priority: Blocker
>
> Hello
> I'm trying to run a spark application (distributedWekaSpark) but  when I'm 
> using the spark-submit command I get this error
> {quote}{quote}ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
> java.io.IOException: Expected 12 fields, but got 5 for row: 
> outlook,temperature,humidity,windy,play
> {quote}{quote}
> I tried with other datasets but always the same error appeared, (always 12 
> fields expected)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date

2018-03-08 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391282#comment-16391282
 ] 

zhoukang edited comment on SPARK-23549 at 3/8/18 2:07 PM:
--

I think this is a bug.Which may caused by rule below:(I have not run any 
test,just guessing)
{code:java}
case p @ BinaryComparison(left, right)
if findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).isDefined =>
val commonType = findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).get
p.makeCopy(Array(castExpr(left, commonType), castExpr(right, 
commonType)))
{code}
findCommonTypeForBinaryComparison will return StringType:
{code:java}
case (DateType, TimestampType) => Some(StringType)
{code}
May be we can add a new rule for this case?






was (Author: cane):
I think this is a bug.Which may caused by rule below:
{code:java}
case p @ BinaryComparison(left, right)
if findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).isDefined =>
val commonType = findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).get
p.makeCopy(Array(castExpr(left, commonType), castExpr(right, 
commonType)))
{code}
findCommonTypeForBinaryComparison will return StringType:
{code:java}
case (DateType, TimestampType) => Some(StringType)
{code}
May be we can add a new rule for this case?





> Spark SQL unexpected behavior when comparing timestamp to date
> --
>
> Key: SPARK-23549
> URL: https://issues.apache.org/jira/browse/SPARK-23549
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Dong Jiang
>Priority: Major
>
> {code:java}
> scala> spark.version
> res1: String = 2.2.1
> scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between 
> cast('2017-02-28' as date) and cast('2017-03-01' as date)").show
> +---+
> |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= 
> CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 
> AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))|
> +---+
> |                                                                             
>                                                                               
>                                                false|
> +---+{code}
> As shown above, when a timestamp is compared to date in SparkSQL, both 
> timestamp and date are downcast to string, and leading to unexpected result. 
> If run the same SQL in presto/Athena, I got the expected result
> {code:java}
> select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as 
> date) and cast('2017-03-01' as date)
>   _col0
> 1 true
> {code}
> Is this a bug for Spark or a feature?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date

2018-03-08 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391282#comment-16391282
 ] 

zhoukang commented on SPARK-23549:
--

I think this is a bug.Which may caused by rule below:
{code:java}
case p @ BinaryComparison(left, right)
if findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).isDefined =>
val commonType = findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).get
p.makeCopy(Array(castExpr(left, commonType), castExpr(right, 
commonType)))
{code}
findCommonTypeForBinaryComparison will return StringType:
{code:java}
case (DateType, TimestampType) => Some(StringType)
{code}
May be we can add a new rule for this case?





> Spark SQL unexpected behavior when comparing timestamp to date
> --
>
> Key: SPARK-23549
> URL: https://issues.apache.org/jira/browse/SPARK-23549
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Dong Jiang
>Priority: Major
>
> {code:java}
> scala> spark.version
> res1: String = 2.2.1
> scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between 
> cast('2017-02-28' as date) and cast('2017-03-01' as date)").show
> +---+
> |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= 
> CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 
> AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))|
> +---+
> |                                                                             
>                                                                               
>                                                false|
> +---+{code}
> As shown above, when a timestamp is compared to date in SparkSQL, both 
> timestamp and date are downcast to string, and leading to unexpected result. 
> If run the same SQL in presto/Athena, I got the expected result
> {code:java}
> select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as 
> date) and cast('2017-03-01' as date)
>   _col0
> 1 true
> {code}
> Is this a bug for Spark or a feature?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0

2018-03-01 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang resolved SPARK-23335.
--
Resolution: Won't Fix

> expression 'select conv('',16,10) % 2' return 0
> ---
>
> Key: SPARK-23335
> URL: https://issues.apache.org/jira/browse/SPARK-23335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
>
> For below expression:
> {code:java}
> select conv('',16,10) % 2;
> {code}
> it will return 0.
> {code:java}
> 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
>  
> +--+--+
>  
> | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
> DECIMAL(20,0)) AS DOUBLE)) |
>  
> +--+--+
>  | 0.0 | 
> +--+--+
> {code}
> It caused by:
> {code:java}
> case a @ BinaryArithmetic(left @ StringType(), right) => 
> a.makeCopy(Array(Cast(left, DoubleType), right)) 
> case a @ BinaryArithmetic(left, right @ StringType()) => 
> a.makeCopy(Array(left, Cast(right, DoubleType)))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0

2018-03-01 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381827#comment-16381827
 ] 

zhoukang commented on SPARK-23335:
--

Actually hive return the same result

{code:java}
0: jdbc:hive2://c3-hadoop-hive01.bj:18203/> select 
conv('',16,10) % 2;
+--+--+
| _c0  |
+--+--+
| 0.0  |
+--+--+
{code}


> expression 'select conv('',16,10) % 2' return 0
> ---
>
> Key: SPARK-23335
> URL: https://issues.apache.org/jira/browse/SPARK-23335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
>
> For below expression:
> {code:java}
> select conv('',16,10) % 2;
> {code}
> it will return 0.
> {code:java}
> 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
>  
> +--+--+
>  
> | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
> DECIMAL(20,0)) AS DOUBLE)) |
>  
> +--+--+
>  | 0.0 | 
> +--+--+
> {code}
> It caused by:
> {code:java}
> case a @ BinaryArithmetic(left @ StringType(), right) => 
> a.makeCopy(Array(Cast(left, DoubleType), right)) 
> case a @ BinaryArithmetic(left, right @ StringType()) => 
> a.makeCopy(Array(left, Cast(right, DoubleType)))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0

2018-03-01 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381827#comment-16381827
 ] 

zhoukang edited comment on SPARK-23335 at 3/1/18 10:56 AM:
---

Actually hive return the same result

{code:java}
0: jdbc:hive2://xxx/> select conv('',16,10) % 2;
+--+--+
| _c0  |
+--+--+
| 0.0  |
+--+--+
{code}



was (Author: cane):
Actually hive return the same result

{code:java}
0: jdbc:hive2://c3-hadoop-hive01.bj:18203/> select 
conv('',16,10) % 2;
+--+--+
| _c0  |
+--+--+
| 0.0  |
+--+--+
{code}


> expression 'select conv('',16,10) % 2' return 0
> ---
>
> Key: SPARK-23335
> URL: https://issues.apache.org/jira/browse/SPARK-23335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
>
> For below expression:
> {code:java}
> select conv('',16,10) % 2;
> {code}
> it will return 0.
> {code:java}
> 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
>  
> +--+--+
>  
> | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
> DECIMAL(20,0)) AS DOUBLE)) |
>  
> +--+--+
>  | 0.0 | 
> +--+--+
> {code}
> It caused by:
> {code:java}
> case a @ BinaryArithmetic(left @ StringType(), right) => 
> a.makeCopy(Array(Cast(left, DoubleType), right)) 
> case a @ BinaryArithmetic(left, right @ StringType()) => 
> a.makeCopy(Array(left, Cast(right, DoubleType)))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0

2018-03-01 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381824#comment-16381824
 ] 

zhoukang commented on SPARK-23335:
--

Agree with you!
[~cloud_fan] I will search into hive.

> expression 'select conv('',16,10) % 2' return 0
> ---
>
> Key: SPARK-23335
> URL: https://issues.apache.org/jira/browse/SPARK-23335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
>
> For below expression:
> {code:java}
> select conv('',16,10) % 2;
> {code}
> it will return 0.
> {code:java}
> 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
>  
> +--+--+
>  
> | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
> DECIMAL(20,0)) AS DOUBLE)) |
>  
> +--+--+
>  | 0.0 | 
> +--+--+
> {code}
> It caused by:
> {code:java}
> case a @ BinaryArithmetic(left @ StringType(), right) => 
> a.makeCopy(Array(Cast(left, DoubleType), right)) 
> case a @ BinaryArithmetic(left, right @ StringType()) => 
> a.makeCopy(Array(left, Cast(right, DoubleType)))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0

2018-03-01 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381800#comment-16381800
 ] 

zhoukang commented on SPARK-23335:
--

[~jiangxb1987]
[~cloud_fan] Any suggestion?Thanks!

> expression 'select conv('',16,10) % 2' return 0
> ---
>
> Key: SPARK-23335
> URL: https://issues.apache.org/jira/browse/SPARK-23335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
>
> For below expression:
> {code:java}
> select conv('',16,10) % 2;
> {code}
> it will return 0.
> {code:java}
> 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
>  
> +--+--+
>  
> | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
> DECIMAL(20,0)) AS DOUBLE)) |
>  
> +--+--+
>  | 0.0 | 
> +--+--+
> {code}
> It caused by:
> {code:java}
> case a @ BinaryArithmetic(left @ StringType(), right) => 
> a.makeCopy(Array(Cast(left, DoubleType), right)) 
> case a @ BinaryArithmetic(left, right @ StringType()) => 
> a.makeCopy(Array(left, Cast(right, DoubleType)))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23508) blockManagerIdCache in BlockManagerId may cause oom

2018-02-24 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23508:
-
Description: 
blockManagerIdCache in BlockManagerId will not remove old values which may 
cause oom
{code:java}
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, 
BlockManagerId]()
{code}
Since whenever we apply a new BlockManagerId, it will put into this map.

below is an jmap:

!elepahnt-oom1.png!

!elephant-oom.png!

  was:
blockManagerIdCache in BlockManagerId will not remove old values which may 
cause oom
{code:java}
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, 
BlockManagerId]()
{code}
Since whenever we apply a new BlockManagerId, it will put into this map.

!elephant-oom.png!


> blockManagerIdCache in BlockManagerId may cause oom
> ---
>
> Key: SPARK-23508
> URL: https://issues.apache.org/jira/browse/SPARK-23508
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Spark Core
>Affects Versions: 2.1.1, 2.2.1
>Reporter: zhoukang
>Priority: Major
> Attachments: elepahnt-oom1.png, elephant-oom.png
>
>
> blockManagerIdCache in BlockManagerId will not remove old values which may 
> cause oom
> {code:java}
> val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, 
> BlockManagerId]()
> {code}
> Since whenever we apply a new BlockManagerId, it will put into this map.
> below is an jmap:
> !elepahnt-oom1.png!
> !elephant-oom.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23508) blockManagerIdCache in BlockManagerId may cause oom

2018-02-24 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23508:
-
Description: 
blockManagerIdCache in BlockManagerId will not remove old values which may 
cause oom
{code:java}
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, 
BlockManagerId]()
{code}
Since whenever we apply a new BlockManagerId, it will put into this map.

!elephant-oom.png!

  was:
blockManagerIdCache in BlockManagerId will not remove old values which may 
cause oom
{code:java}
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, 
BlockManagerId]()
{code}
Since whenever we apply a new BlockManagerId, it will put into this map.


> blockManagerIdCache in BlockManagerId may cause oom
> ---
>
> Key: SPARK-23508
> URL: https://issues.apache.org/jira/browse/SPARK-23508
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Spark Core
>Affects Versions: 2.1.1, 2.2.1
>Reporter: zhoukang
>Priority: Major
> Attachments: elepahnt-oom1.png, elephant-oom.png
>
>
> blockManagerIdCache in BlockManagerId will not remove old values which may 
> cause oom
> {code:java}
> val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, 
> BlockManagerId]()
> {code}
> Since whenever we apply a new BlockManagerId, it will put into this map.
> !elephant-oom.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23508) blockManagerIdCache in BlockManagerId may cause oom

2018-02-24 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23508:
-
Attachment: elephant-oom.png
elepahnt-oom1.png

> blockManagerIdCache in BlockManagerId may cause oom
> ---
>
> Key: SPARK-23508
> URL: https://issues.apache.org/jira/browse/SPARK-23508
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Spark Core
>Affects Versions: 2.1.1, 2.2.1
>Reporter: zhoukang
>Priority: Major
> Attachments: elepahnt-oom1.png, elephant-oom.png
>
>
> blockManagerIdCache in BlockManagerId will not remove old values which may 
> cause oom
> {code:java}
> val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, 
> BlockManagerId]()
> {code}
> Since whenever we apply a new BlockManagerId, it will put into this map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23508) blockManagerIdCache in BlockManagerId may cause oom

2018-02-24 Thread zhoukang (JIRA)
zhoukang created SPARK-23508:


 Summary: blockManagerIdCache in BlockManagerId may cause oom
 Key: SPARK-23508
 URL: https://issues.apache.org/jira/browse/SPARK-23508
 Project: Spark
  Issue Type: Bug
  Components: Deploy, Spark Core
Affects Versions: 2.2.1, 2.1.1
Reporter: zhoukang


blockManagerIdCache in BlockManagerId will not remove old values which may 
cause oom
{code:java}
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, 
BlockManagerId]()
{code}
Since whenever we apply a new BlockManagerId, it will put into this map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0

2018-02-05 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23335:
-
Description: 
For below expression:
{code:java}
select conv('',16,10) % 2;

{code}
it will return 0.
{code:java}
0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
 
+--+--+
 
| (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
DECIMAL(20,0)) AS DOUBLE)) |
 
+--+--+
 | 0.0 | 
+--+--+

{code}
It caused by:
{code:java}
case a @ BinaryArithmetic(left @ StringType(), right) => 
a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ 
BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, 
Cast(right, DoubleType)))

{code}

  was:
For below expression:
{code:java}
select conv('',16,10) % 2;

{code}
it will return 0.
{code:java}
0: jdbc:hive2://c3-hadoop-prc-history03.bj:16> select 
conv('',16,10) % 2;
 
+--+--+
 
| (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
DECIMAL(20,0)) AS DOUBLE)) |
 
+--+--+
 | 0.0 | 
+--+--+

{code}
It caused by:
{code:java}
case a @ BinaryArithmetic(left @ StringType(), right) => 
a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ 
BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, 
Cast(right, DoubleType)))

{code}


> expression 'select conv('',16,10) % 2' return 0
> ---
>
> Key: SPARK-23335
> URL: https://issues.apache.org/jira/browse/SPARK-23335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
>
> For below expression:
> {code:java}
> select conv('',16,10) % 2;
> {code}
> it will return 0.
> {code:java}
> 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
>  
> +--+--+
>  
> | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
> DECIMAL(20,0)) AS DOUBLE)) |
>  
> +--+--+
>  | 0.0 | 
> +--+--+
> {code}
> It caused by:
> {code:java}
> case a @ BinaryArithmetic(left @ StringType(), right) => 
> a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ 
> BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, 
> Cast(right, DoubleType)))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0

2018-02-05 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23335:
-
Description: 
For below expression:
{code:java}
select conv('',16,10) % 2;

{code}
it will return 0.
{code:java}
0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
 
+--+--+
 
| (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
DECIMAL(20,0)) AS DOUBLE)) |
 
+--+--+
 | 0.0 | 
+--+--+

{code}
It caused by:
{code:java}
case a @ BinaryArithmetic(left @ StringType(), right) => 
a.makeCopy(Array(Cast(left, DoubleType), right)) 
case a @ BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, 
Cast(right, DoubleType)))

{code}

  was:
For below expression:
{code:java}
select conv('',16,10) % 2;

{code}
it will return 0.
{code:java}
0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
 
+--+--+
 
| (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
DECIMAL(20,0)) AS DOUBLE)) |
 
+--+--+
 | 0.0 | 
+--+--+

{code}
It caused by:
{code:java}
case a @ BinaryArithmetic(left @ StringType(), right) => 
a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ 
BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, 
Cast(right, DoubleType)))

{code}


> expression 'select conv('',16,10) % 2' return 0
> ---
>
> Key: SPARK-23335
> URL: https://issues.apache.org/jira/browse/SPARK-23335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
>
> For below expression:
> {code:java}
> select conv('',16,10) % 2;
> {code}
> it will return 0.
> {code:java}
> 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2;
>  
> +--+--+
>  
> | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
> DECIMAL(20,0)) AS DOUBLE)) |
>  
> +--+--+
>  | 0.0 | 
> +--+--+
> {code}
> It caused by:
> {code:java}
> case a @ BinaryArithmetic(left @ StringType(), right) => 
> a.makeCopy(Array(Cast(left, DoubleType), right)) 
> case a @ BinaryArithmetic(left, right @ StringType()) => 
> a.makeCopy(Array(left, Cast(right, DoubleType)))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0

2018-02-05 Thread zhoukang (JIRA)
zhoukang created SPARK-23335:


 Summary: expression 'select conv('',16,10) % 2' 
return 0
 Key: SPARK-23335
 URL: https://issues.apache.org/jira/browse/SPARK-23335
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: zhoukang


For below expression:
{code:java}
select conv('',16,10) % 2;

{code}
it will return 0.
{code:java}
0: jdbc:hive2://c3-hadoop-prc-history03.bj:16> select 
conv('',16,10) % 2;
 
+--+--+
 
| (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS 
DECIMAL(20,0)) AS DOUBLE)) |
 
+--+--+
 | 0.0 | 
+--+--+

{code}
It caused by:
{code:java}
case a @ BinaryArithmetic(left @ StringType(), right) => 
a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ 
BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, 
Cast(right, DoubleType)))

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23129) Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too many times

2018-01-25 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23129:
-
Description: 
Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init 
when DiskMapIterator instance created.This will cause memory use overhead when 
ExternalAppendOnlyMap spill too many times.

We can avoid this by making deserializeStream init when it is used the first 
time.

  was:
Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init 
when DiskMapIterator instance created.This will cause memory use overhead when 
ExternalAppendOnlyMap spill too much times.

We can avoid this by making deserializeStream init when it is used the first 
time.


> Lazy init DiskMapIterator#deserializeStream to reduce memory usage when 
> ExternalAppendOnlyMap spill  too many times
> ---
>
> Key: SPARK-23129
> URL: https://issues.apache.org/jira/browse/SPARK-23129
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Assignee: zhoukang
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init 
> when DiskMapIterator instance created.This will cause memory use overhead 
> when ExternalAppendOnlyMap spill too many times.
> We can avoid this by making deserializeStream init when it is used the first 
> time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23129) Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too many times

2018-01-25 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23129:
-
Summary: Lazy init DiskMapIterator#deserializeStream to reduce memory usage 
when ExternalAppendOnlyMap spill  too many times  (was: Lazy init 
DiskMapIterator#deserializeStream to reduce memory usage when 
ExternalAppendOnlyMap spill  too much times)

> Lazy init DiskMapIterator#deserializeStream to reduce memory usage when 
> ExternalAppendOnlyMap spill  too many times
> ---
>
> Key: SPARK-23129
> URL: https://issues.apache.org/jira/browse/SPARK-23129
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Assignee: zhoukang
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init 
> when DiskMapIterator instance created.This will cause memory use overhead 
> when ExternalAppendOnlyMap spill too much times.
> We can avoid this by making deserializeStream init when it is used the first 
> time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23129) Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too much times

2018-01-17 Thread zhoukang (JIRA)
zhoukang created SPARK-23129:


 Summary: Lazy init DiskMapIterator#deserializeStream to reduce 
memory usage when ExternalAppendOnlyMap spill  too much times
 Key: SPARK-23129
 URL: https://issues.apache.org/jira/browse/SPARK-23129
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.1.0
Reporter: zhoukang


Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init 
when DiskMapIterator instance created.This will cause memory use overhead when 
ExternalAppendOnlyMap spill too much times.

We can avoid this by making deserializeStream init when it is used the first 
time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23076) When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326685#comment-16326685
 ] 

zhoukang commented on SPARK-23076:
--

Yes,may it should be an improvement?Since i suppose some other may have same 
requirement.

And the original reason we cache the MapPartitionsRDD is that we do result size 
estimate in SparkExecuteStatementOperation#execute,so we cache the result rdd 
first.

The reason we estimate the result size is to avoid thriftserver oom error. 
[~cloud_fan]

> When we call cache() on RDD which depends on ShuffleRowRDD, we will get an 
> error result
> ---
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> ++++--
>  However,when we call cache on MapPartitionsRDD below:
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> ++++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under 
> byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
> if (i != 0) build.append(',');
> build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
> baseOffset + i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
> baseOffset here
>   return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
> iterator when config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
> if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
>   && x._2.isInstanceOf[UnsafeRow]) {
>   (x._2).asInstanceOf[UnsafeRow].copy()
> } else {
>   x._2
> }
>   })
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23076) When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23076:
-
Issue Type: Improvement  (was: Bug)

> When we call cache() on RDD which depends on ShuffleRowRDD, we will get an 
> error result
> ---
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> ++++--
>  However,when we call cache on MapPartitionsRDD below:
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> ++++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under 
> byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
> if (i != 0) build.append(',');
> build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
> baseOffset + i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
> baseOffset here
>   return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
> iterator when config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
> if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
>   && x._2.isInstanceOf[UnsafeRow]) {
>   (x._2).asInstanceOf[UnsafeRow].copy()
> } else {
>   x._2
> }
>   })
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23076) When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23076:
-
Summary: When we call cache() on RDD which depends on ShuffleRowRDD, we 
will get an error result  (was: When we call cache() on ShuffleRowRDD, we will 
get an error result)

> When we call cache() on RDD which depends on ShuffleRowRDD, we will get an 
> error result
> ---
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> ++++--
>  However,when we call cache on MapPartitionsRDD below:
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> ++++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under 
> byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
> if (i != 0) build.append(',');
> build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
> baseOffset + i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
> baseOffset here
>   return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
> iterator when config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
> if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
>   && x._2.isInstanceOf[UnsafeRow]) {
>   (x._2).asInstanceOf[UnsafeRow].copy()
> } else {
>   x._2
> }
>   })
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326242#comment-16326242
 ] 

zhoukang edited comment on SPARK-23076 at 1/15/18 1:34 PM:
---

As the picture i posted, i cached MapPartitionsRDD which depends on 
ShuffleRowRDD.

We used this for our internal spark sql service [~cloud_fan]


was (Author: cane):
As the picture i posted, i cached MapPartitionsRDD which depends on 
ShuffleRowRDD.

We used this our internal spark sql service [~cloud_fan]

> When we call cache() on ShuffleRowRDD, we will get an error result
> --
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> ++++--
>  However,when we call cache on MapPartitionsRDD below:
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> ++++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under 
> byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
> if (i != 0) build.append(',');
> build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
> baseOffset + i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
> baseOffset here
>   return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
> iterator when config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
> if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
>   && x._2.isInstanceOf[UnsafeRow]) {
>   (x._2).asInstanceOf[UnsafeRow].copy()
> } else {
>   x._2
> }
>   })
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326242#comment-16326242
 ] 

zhoukang commented on SPARK-23076:
--

As the picture i posted, i cached MapPartitionsRDD which depends on 
ShuffleRowRDD.

We used this our internal spark sql service [~cloud_fan]

> When we call cache() on ShuffleRowRDD, we will get an error result
> --
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> ++++--
>  However,when we call cache on MapPartitionsRDD below:
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> ++++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under 
> byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
> if (i != 0) build.append(',');
> build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
> baseOffset + i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
> baseOffset here
>   return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
> iterator when config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
> if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
>   && x._2.isInstanceOf[UnsafeRow]) {
>   (x._2).asInstanceOf[UnsafeRow].copy()
> } else {
>   x._2
> }
>   })
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23076:
-
Description: 
For query below:
{code:java}
select * from csv_demo limit 3;
{code}
The correct result should be:
 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
 ++++--
|_c0|_c1|

++++--
|Joe|20|
|Tom|30|
|Hyukjin|25|

++++--
 However,when we call cache on MapPartitionsRDD below:

  !shufflerowrdd-cache.png!

Then result will be error:
 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
 ++++--
|_c0|_c1|

++++--
|Hyukjin|25|
|Hyukjin|25|
|Hyukjin|25|

++++--
 The reason why this happen is that:

UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte 
buffer

I print some log below to explain this:

Modify UnsafeRow.toString()
{code:java}
// This is for debugging
@Override
public String toString() {
  StringBuilder build = new StringBuilder("[");
  for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
baseOffset + i)));
  }
  build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
baseOffset here
  return build.toString();
}{code}
{code:java}
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
{code}
we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
iterator when config is true,like below:
{code:java}
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
  && x._2.isInstanceOf[UnsafeRow]) {
  (x._2).asInstanceOf[UnsafeRow].copy()
} else {
  x._2
}
  })
}
{code}

  was:
For query below:
{code:java}
select * from csv_demo limit 3;
{code}
The correct result should be:
 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
 +---+-++--
|_c0|_c1|

+---+-++--
|Joe|20|
|Tom|30|
|Hyukjin|25|

+---+-++--
 However,when we call cache on ShuffleRowRDD(or RDD which depends on 
ShuffleRowRDD in one stage):

  !shufflerowrdd-cache.png!

Then result will be error:
 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
 +---+-++--
|_c0|_c1|

+---+-++--
|Hyukjin|25|
|Hyukjin|25|
|Hyukjin|25|

+---+-++--
 The reason why this happen is that:

UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte 
buffer

I print some log below to explain this:

Modify UnsafeRow.toString()
{code:java}
// This is for debugging
@Override
public String toString() {
  StringBuilder build = new StringBuilder("[");
  for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
baseOffset + i)));
  }
  build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
baseOffset here
  return build.toString();
}{code}
{code:java}
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
{code}
we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
iterator when config is true,like below:
{code:java}
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
  && x._2.isInstanceOf[UnsafeRow]) {
  (x._2).asInstanceOf[UnsafeRow].copy()
} else {
  x._2
}
  })
}
{code}


> When we call cache() on ShuffleRowRDD, we will get an error result
> --
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from 

[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23076:
-
Description: 
For query below:
{code:java}
select * from csv_demo limit 3;
{code}
The correct result should be:
 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
 +---+-++--
|_c0|_c1|

+---+-++--
|Joe|20|
|Tom|30|
|Hyukjin|25|

+---+-++--
 However,when we call cache on ShuffleRowRDD(or RDD which depends on 
ShuffleRowRDD in one stage):

  !shufflerowrdd-cache.png!

Then result will be error:
 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
 +---+-++--
|_c0|_c1|

+---+-++--
|Hyukjin|25|
|Hyukjin|25|
|Hyukjin|25|

+---+-++--
 The reason why this happen is that:

UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte 
buffer

I print some log below to explain this:

Modify UnsafeRow.toString()
{code:java}
// This is for debugging
@Override
public String toString() {
  StringBuilder build = new StringBuilder("[");
  for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
baseOffset + i)));
  }
  build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
baseOffset here
  return build.toString();
}{code}
{code:java}
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
{code}
we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
iterator when config is true,like below:
{code:java}
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
  && x._2.isInstanceOf[UnsafeRow]) {
  (x._2).asInstanceOf[UnsafeRow].copy()
} else {
  x._2
}
  })
}
{code}

  was:
 

For query below:

 
{code:java}
select * from csv_demo limit 3;
{code}
The correct result should be:
 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
 +--+--++--
|_c0|_c1|

+--+--++--
|Joe|20|
|Tom|30|
|Hyukjin|25|

+--+--++--
 However,when we call cache on ShuffleRowRDD(or RDD which depends on 
ShuffleRowRDD in one stage):

  !shufflerowrdd-cache.png!

Then result will be error:
 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
 +--+--++--
|_c0|_c1|

+--+--++--
|Hyukjin|25|
|Hyukjin|25|
|Hyukjin|25|

+--+--++--
 The reason why this happen is that:

UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte 
buffer

I print some log below to explain this:

Modify UnsafeRow.toString()
{code:java}
// This is for debugging
@Override
public String toString() {
  StringBuilder build = new StringBuilder("[");
  for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
baseOffset + i)));
  }
  build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
baseOffset here
  return build.toString();
}{code}
{code:java}
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
{code}
we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
iterator when config is true,like below:
{code:java}
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
  && x._2.isInstanceOf[UnsafeRow]) {
  (x._2).asInstanceOf[UnsafeRow].copy()
} else {
  x._2
}
  })
}
{code}


> When we call cache() on ShuffleRowRDD, we will get an error result
> --
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: 

[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23076:
-
Description: 
 

For query below:

 
{code:java}
select * from csv_demo limit 3;
{code}
The correct result should be:
 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
 +--+--++--
|_c0|_c1|

+--+--++--
|Joe|20|
|Tom|30|
|Hyukjin|25|

+--+--++--
 However,when we call cache on ShuffleRowRDD(or RDD which depends on 
ShuffleRowRDD in one stage):

  !shufflerowrdd-cache.png!

Then result will be error:
 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
 +--+--++--
|_c0|_c1|

+--+--++--
|Hyukjin|25|
|Hyukjin|25|
|Hyukjin|25|

+--+--++--
 The reason why this happen is that:

UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte 
buffer

I print some log below to explain this:

Modify UnsafeRow.toString()
{code:java}
// This is for debugging
@Override
public String toString() {
  StringBuilder build = new StringBuilder("[");
  for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
baseOffset + i)));
  }
  build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
baseOffset here
  return build.toString();
}{code}
{code:java}
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
{code}
we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
iterator when config is true,like below:
{code:java}
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
  && x._2.isInstanceOf[UnsafeRow]) {
  (x._2).asInstanceOf[UnsafeRow].copy()
} else {
  x._2
}
  })
}
{code}

  was:
For query below:
{code:java}
select * from csv_demo limit 3;
{code}
The correct result should be:
 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
 ++++--
|_c0|_c1|

++++--
|Joe|20|
|Tom|30|
|Hyukjin|25|

++++--
 However,when we call cache on ShuffleRowRDD(or RDD which depends on 
ShuffleRowRDD in one stage):

  !shufflerowrdd-cache.png!

Then result will be error:
 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
 ++++--
|_c0|_c1|

++++--
|Hyukjin|25|
|Hyukjin|25|
|Hyukjin|25|

++++--
 The reason why this happen is that:

UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte 
buffer

I print some log below to explain this:

Modify UnsafeRow.toString()
{code:java}
// This is for debugging
@Override
public String toString() {
  StringBuilder build = new StringBuilder("[");
  for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
baseOffset + i)));
  }
  build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
baseOffset here
  return build.toString();
}{code}
{code:java}
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
{code}


> When we call cache() on ShuffleRowRDD, we will get an error result
> --
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
>  
> For query below:
>  
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
>  +--+--++--
> |_c0|_c1|
> +--+--++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> +--+--++--
>  However,when we call cache on ShuffleRowRDD(or RDD which depends on 
> ShuffleRowRDD in one stage):
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from 

[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23076:
-
Description: 
For query below:
{code:java}
select * from csv_demo limit 3;
{code}
The correct result should be:
 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
 ++++--
|_c0|_c1|

++++--
|Joe|20|
|Tom|30|
|Hyukjin|25|

++++--
 However,when we call cache on ShuffleRowRDD(or RDD which depends on 
ShuffleRowRDD in one stage):

  !shufflerowrdd-cache.png!

Then result will be error:
 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
 ++++--
|_c0|_c1|

++++--
|Hyukjin|25|
|Hyukjin|25|
|Hyukjin|25|

++++--
 The reason why this happen is that:

UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte 
buffer

I print some log below to explain this:

Modify UnsafeRow.toString()
{code:java}
// This is for debugging
@Override
public String toString() {
  StringBuilder build = new StringBuilder("[");
  for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
baseOffset + i)));
  }
  build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
baseOffset here
  return build.toString();
}{code}
{code:java}
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]
{code}

  was:
For query below:
{code:java}
select * from csv_demo limit 3;
{code}
The correct result should be:
 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
 +---+-++--
|_c0|_c1|

+---+-++--
|Joe|20|
|Tom|30|
|Hyukjin|25|

+---+-++--
 However,when we call cache on ShuffleRowRDD(or RDD which depends on 
ShuffleRowRDD in one stage):

 

Then result will be error:
 0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
 +---+-++--
|_c0|_c1|

+---+-++--
|Hyukjin|25|
|Hyukjin|25|
|Hyukjin|25|

+---+-++--
 The reason why this happen is that:

UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte 
buffer

I print some log below to explain this:

Modify UnsafeRow.toString()
{code:java}
// This is for debugging
@Override
public String toString() {
  StringBuilder build = new StringBuilder("[");
  for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
baseOffset + i)));
  }
  build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
baseOffset here
  return build.toString();
}{code}
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]


> When we call cache() on ShuffleRowRDD, we will get an error result
> --
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> ++++--
>  However,when we call cache on ShuffleRowRDD(or RDD which depends on 
> ShuffleRowRDD in one stage):
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  ++++--
> |_c0|_c1|
> ++++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> ++++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under 
> byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = 

[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result

2018-01-15 Thread zhoukang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-23076:
-
Attachment: shufflerowrdd-cache.png

> When we call cache() on ShuffleRowRDD, we will get an error result
> --
>
> Key: SPARK-23076
> URL: https://issues.apache.org/jira/browse/SPARK-23076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: zhoukang
>Priority: Major
> Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3;
>  +---+-++--
> |_c0|_c1|
> +---+-++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> +---+-++--
>  However,when we call cache on ShuffleRowRDD(or RDD which depends on 
> ShuffleRowRDD in one stage):
>  
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  +---+-++--
> |_c0|_c1|
> +---+-++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> +---+-++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under 
> byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
> if (i != 0) build.append(',');
> build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
> baseOffset + i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
> baseOffset here
>   return build.toString();
> }{code}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16]
>  2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16]
>  2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   >